21 VuoThreadManager::Worker::Worker(dispatch_queue_t queue,
void *context,
void (*
function)(
void *),
22 int minThreadsNeeded,
int maxThreadsNeeded,
23 unsigned long eventId,
unsigned long compositionHash,
int chainCount)
26 this->context = context;
27 this->
function =
function;
28 this->isTrigger =
true;
29 this->minThreadsNeeded = minThreadsNeeded;
30 this->maxThreadsNeeded = maxThreadsNeeded;
31 this->eventId = eventId;
32 this->compositionHash = compositionHash;
33 this->chainIndex = ULONG_MAX;
34 this->chainCount = chainCount;
35 this->upstreamChainIndices = NULL;
36 this->upstreamChainIndicesCount = 0;
42 VuoThreadManager::Worker::Worker(dispatch_queue_t queue,
void *context,
void (*
function)(
void *),
43 int minThreadsNeeded,
int maxThreadsNeeded,
44 unsigned long eventId,
unsigned long compositionHash,
unsigned long chainIndex,
45 unsigned long *upstreamChainIndices,
int upstreamChainIndicesCount)
48 this->context = context;
49 this->
function =
function;
50 this->isTrigger =
false;
51 this->minThreadsNeeded = minThreadsNeeded;
52 this->maxThreadsNeeded = maxThreadsNeeded;
53 this->eventId = eventId;
54 this->compositionHash = compositionHash;
55 this->chainIndex = chainIndex;
56 this->chainCount = -1;
57 this->upstreamChainIndices = upstreamChainIndices;
58 this->upstreamChainIndicesCount = upstreamChainIndicesCount;
64 VuoThreadManager::WorkerQueue::WorkerQueue(
void)
74 void VuoThreadManager::WorkerQueue::enqueue(Worker *worker)
76 Node *node =
new Node;
77 node->worker = worker;
97 void VuoThreadManager::WorkerQueue::dequeue(Node *node)
102 node->next->prev = node->prev;
107 node->prev->next = node->next;
144 size_t VuoThreadManager::WorkerQueue::size(
void)
147 for (Node *curr = first; curr != NULL; curr = curr->next)
155 VuoThreadManager::ThreadPool::ThreadPool(
void)
158 threadsAvailable = 0;
165 void VuoThreadManager::ThreadPool::setTotalThreads(
int totalThreads)
167 this->totalThreads = totalThreads;
168 this->threadsAvailable = totalThreads;
175 bool VuoThreadManager::ThreadPool::tryClaimThreads(
int minThreadsNeeded,
int maxThreadsNeeded,
unsigned long workerId,
int &threadsClaimed)
177 if (minThreadsNeeded <= threadsAvailable)
179 threadsClaimed = min(threadsAvailable, maxThreadsNeeded);
180 threadsAvailable -= threadsClaimed;
181 workersClaimingThreads[workerId] = threadsClaimed;
196 int VuoThreadManager::ThreadPool::getThreadsClaimed(
unsigned long workerId)
198 map<unsigned long, int>::iterator iter = workersClaimingThreads.find(workerId);
199 if (iter != workersClaimingThreads.end())
202 throw VuoException(
"Couldn't find worker in thread pool to get its claimed threads.");
210 void VuoThreadManager::ThreadPool::returnThreads(
unsigned long workerId)
212 map<unsigned long, int>::iterator iter = workersClaimingThreads.find(workerId);
213 if (iter != workersClaimingThreads.end())
215 threadsAvailable += iter->second;
216 if (totalWorkers > 0)
217 workersCompleted.insert(iter->first);
218 workersClaimingThreads.erase(iter);
221 throw VuoException(
"Couldn't find worker in thread pool to return its threads.");
230 mainThreadPool.setTotalThreads(60);
233 workersUpdated = dispatch_semaphore_create(0);
234 completed = dispatch_semaphore_create(0);
242 dispatch_release(workersWaitingSync);
243 dispatch_release(threadPoolSync);
244 dispatch_release(workersUpdated);
252 mayMoreWorkersBeEnqueued =
true;
253 mayMoreWorkersBeDequeued =
true;
255 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0);
256 dispatch_async(queue, ^{
257 while (mayMoreWorkersBeDequeued)
259 vector<Worker *> workers = dequeueWorkers();
261 for (vector<Worker *>::iterator i = workers.begin(); i != workers.end(); ++i)
264 dispatch_async_f(w->queue, w->context, w->function);
269 dispatch_semaphore_signal(completed);
278 dispatch_sync(workersWaitingSync, ^{
279 mayMoreWorkersBeEnqueued =
false;
282 dispatch_semaphore_signal(workersUpdated);
283 dispatch_semaphore_wait(completed, DISPATCH_TIME_FOREVER);
285 mainThreadPool.workersCompleted.clear();
301 vector<VuoThreadManager::Worker *> VuoThreadManager::dequeueWorkers(
void)
303 workersDequeued.clear();
304 workersDequeuedNodes.clear();
306 while (workersDequeued.empty() && mayMoreWorkersBeDequeued)
308 dispatch_semaphore_wait(workersUpdated, DISPATCH_TIME_FOREVER);
310 __block WorkerQueue::Node *newest;
311 __block WorkerQueue::Node *oldest;
312 dispatch_sync(workersWaitingSync, ^{
313 newest = workersWaitingForThreads.getNewest();
314 oldest = workersWaitingForThreads.getOldest();
319 dispatch_sync(threadPoolSync, ^{
320 bool hasTriggerBeenSkipped =
false;
322 for (WorkerQueue::Node *n = oldest; n != NULL; n = workersWaitingForThreads.getNextOldest(newest, n))
324 Worker *w = n->worker;
328 if (w->minThreadsNeeded >= 0 && w->maxThreadsNeeded >= 0)
332 if (! hasTriggerBeenSkipped)
335 bool gotThreads = mainThreadPool.tryClaimThreads(w->minThreadsNeeded, w->maxThreadsNeeded, w->eventId, threadsClaimed);
338 ThreadPool &triggerThreadPool = triggerThreadPools[w->eventId][w->compositionHash];
339 triggerThreadPool.setTotalThreads(threadsClaimed);
340 triggerThreadPool.totalWorkers = w->chainCount;
341 workersDequeuedNodes.push_back(n);
345 hasTriggerBeenSkipped = true;
353 triggerThreadPools[w->eventId][w->compositionHash].totalWorkers = w->chainCount;
354 workersDequeuedNodes.push_back(n);
361 ThreadPool &triggerThreadPool = triggerThreadPools[w->eventId][w->compositionHash];
363 bool haveAllUpstreamChainsCompleted =
true;
364 for (
int i = 0; i < w->upstreamChainIndicesCount; ++i)
366 unsigned long index = w->upstreamChainIndices[i];
368 set<unsigned long>::iterator iter = triggerThreadPool.workersCompleted.find( index );
369 if (iter == triggerThreadPool.workersCompleted.end())
371 haveAllUpstreamChainsCompleted = false;
376 if (haveAllUpstreamChainsCompleted)
379 bool gotThreads = triggerThreadPool.tryClaimThreads(w->minThreadsNeeded, w->maxThreadsNeeded, w->chainIndex, threadsClaimed);
382 workersDequeuedNodes.push_back(n);
384 free(w->upstreamChainIndices);
385 w->upstreamChainIndices = NULL;
396 dispatch_sync(workersWaitingSync, ^{
397 for (vector<WorkerQueue::Node *>::iterator i = workersDequeuedNodes.begin(); i != workersDequeuedNodes.end(); ++i)
399 WorkerQueue::Node *n = *i;
400 workersDequeued.push_back(n->worker);
401 workersWaitingForThreads.dequeue(n);
404 if (! mayMoreWorkersBeEnqueued && workersWaitingForThreads.first == NULL) {
405 mayMoreWorkersBeDequeued = false;
410 return workersDequeued;
422 int minThreadsNeeded,
int maxThreadsNeeded,
unsigned long eventId,
423 const char *compositionIdentifier,
int chainCount)
426 int adjustedMinThreadsNeeded = minThreadsNeeded;
427 if (adjustedMinThreadsNeeded > mainThreadPool.totalThreads) {
428 adjustedMinThreadsNeeded = mainThreadPool.totalThreads;
429 VUserLog(
"Warning: Couldn't allocate as many threads to a trigger worker as it requires.");
433 Worker *worker =
new Worker(queue, context,
function, adjustedMinThreadsNeeded, maxThreadsNeeded, eventId, compositionHash, chainCount);
435 dispatch_sync(workersWaitingSync, ^{
436 workersWaitingForThreads.enqueue(worker);
439 dispatch_semaphore_signal(workersUpdated);
448 int minThreadsNeeded,
int maxThreadsNeeded,
unsigned long eventId,
const char *compositionIdentifier,
449 unsigned long chainIndex,
unsigned long *upstreamChainIndices,
int upstreamChainIndicesCount)
452 Worker *worker =
new Worker(queue, context,
function, minThreadsNeeded, maxThreadsNeeded, eventId, compositionHash, chainIndex, upstreamChainIndices, upstreamChainIndicesCount);
454 dispatch_sync(workersWaitingSync, ^{
455 workersWaitingForThreads.enqueue(worker);
458 dispatch_semaphore_signal(workersUpdated);
467 unsigned long chainIndex)
471 dispatch_sync(threadPoolSync, ^{
472 ThreadPool &triggerThreadPool = triggerThreadPools[eventId][compositionHash];
474 bool gotThreads = triggerThreadPool.tryClaimThreads(minThreadsNeeded, maxThreadsNeeded, chainIndex, threadsClaimed);
476 throw VuoException(
"Not enough threads available in the thread pool to execute the chain.");
480 dispatch_semaphore_signal(workersUpdated);
489 const char *subcompositionIdentifier)
494 dispatch_sync(threadPoolSync, ^{
495 map<unsigned long, ThreadPool> &eventThreadPools = triggerThreadPools[eventId];
496 int threadsClaimedForChain = eventThreadPools[compositionHash].getThreadsClaimed(chainIndex);
497 eventThreadPools[subcompositionHash].setTotalThreads(threadsClaimedForChain);
506 dispatch_sync(threadPoolSync, ^{
507 triggerThreadPools.erase(eventId);
508 mainThreadPool.returnThreads(eventId);
511 dispatch_semaphore_signal(workersUpdated);
521 dispatch_sync(threadPoolSync, ^{
522 ThreadPool &triggerThreadPool = triggerThreadPools[eventId][compositionHash];
523 triggerThreadPool.returnThreads(chainIndex);
527 set<unsigned long> &workersCompleted = triggerThreadPool.workersCompleted;
528 if (workersCompleted.size() == triggerThreadPool.totalWorkers)
530 map<unsigned long, ThreadPool> &eventThreadPools = triggerThreadPools[eventId];
531 eventThreadPools.erase(compositionHash);
532 if (eventThreadPools.empty())
534 triggerThreadPools.erase(eventId);
535 mainThreadPool.returnThreads(eventId);
540 dispatch_semaphore_signal(workersUpdated);
550 int minThreadsNeeded,
int maxThreadsNeeded,
unsigned long eventId,
int chainCount)
555 minThreadsNeeded, maxThreadsNeeded, eventId,
556 compositionIdentifier, chainCount);
563 int minThreadsNeeded,
int maxThreadsNeeded,
unsigned long eventId,
unsigned long chainIndex,
564 unsigned long *upstreamChainIndices,
int upstreamChainIndicesCount)
569 minThreadsNeeded, maxThreadsNeeded, eventId, compositionIdentifier,
570 chainIndex, upstreamChainIndices, upstreamChainIndicesCount);
577 int minThreadsNeeded,
int maxThreadsNeeded,
unsigned long eventId,
unsigned long chainIndex)
589 char *subcompositionIdentifier)
594 subcompositionIdentifier);