Vuo 2.4.4
Loading...
Searching...
No Matches
VuoThreadManager.hh
Go to the documentation of this file.
1
10#pragma once
11
12#include <dispatch/dispatch.h>
13#include <map>
14#include <set>
15#include <vector>
16using namespace std;
17
18#include "VuoCompositionState.h"
19#include "VuoHeap.h"
20
30{
31private:
35 class Worker
36 {
37 public:
38 dispatch_queue_t queue;
39 void *context;
40 void (*function)(void *);
41
42 bool isTrigger;
43
44 int minThreadsNeeded;
45 int maxThreadsNeeded;
46
47 unsigned long eventId;
48 unsigned long compositionHash;
49
50 int chainCount;
51
52 unsigned long chainIndex;
53 unsigned long *upstreamChainIndices;
54 int upstreamChainIndicesCount;
55
56 Worker(dispatch_queue_t queue, void *context, void (*function)(void *), int minThreadsNeeded, int maxThreadsNeeded,
57 unsigned long eventId, unsigned long compositionHash, int chainCount);
58 Worker(dispatch_queue_t queue, void *context, void (*function)(void *), int minThreadsNeeded, int maxThreadsNeeded,
59 unsigned long eventId, unsigned long compositionHash, unsigned long chainIndex,
60 unsigned long *upstreamChainIndices, int upstreamChainIndicesCount);
61 };
62
70 class WorkerQueue
71 {
72 public:
76 struct Node
77 {
78 Worker *worker;
81 };
82
83 Node *first;
84 Node *last;
85
86 WorkerQueue(void);
87 void enqueue(Worker *worker);
88 void dequeue(Node *node);
89 Node * getNextOldest(Node *newest, Node *node);
90 Node * getOldest(void);
91 Node * getNewest(void);
92 size_t size(void);
93 };
94
100 class ThreadPool
101 {
102 public:
103 int totalThreads;
104 int threadsAvailable;
105 map<unsigned long, int> workersClaimingThreads;
106
107 int totalWorkers;
108 set<unsigned long> workersCompleted;
109
110 ThreadPool(void);
111 void setTotalThreads(int totalThreads);
112 bool tryClaimThreads(int minThreadsNeeded, int maxThreadsNeeded, unsigned long workerId, int &threadsClaimed) VuoWarnUnusedResult;
113 int getThreadsClaimed(unsigned long workerId);
114 void returnThreads(unsigned long workerId);
115 };
116
117 WorkerQueue workersWaitingForThreads;
118 ThreadPool mainThreadPool;
119 map<unsigned long, map<unsigned long, ThreadPool> > triggerThreadPools;
120 dispatch_queue_t workersWaitingSync;
121 dispatch_queue_t threadPoolSync;
122 dispatch_semaphore_t workersUpdated;
123 dispatch_semaphore_t completed;
124 bool mayMoreWorkersBeEnqueued;
125 bool mayMoreWorkersBeDequeued;
126
127 vector<Worker *> workersDequeued;
128 vector<WorkerQueue::Node *> workersDequeuedNodes;
129
130 vector<Worker *> dequeueWorkers(void);
131
132public:
133 VuoThreadManager(void);
134 ~VuoThreadManager(void);
135 void enableSchedulingWorkers(void);
136 void disableSchedulingWorkers(void);
137 void scheduleTriggerWorker(dispatch_queue_t queue, void *context, void (*function)(void *),
138 int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, const char *compositionIdentifier,
139 int chainCount);
140 void scheduleChainWorker(dispatch_queue_t queue, void *context, void (*function)(void *),
141 int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, const char *compositionIdentifier,
142 unsigned long chainIndex, unsigned long *upstreamChainIndices, int upstreamChainIndicesCount);
143 void grantThreadsToChain(int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, const char *compositionIdentifier,
144 unsigned long chainIndex);
145 void grantThreadsToSubcomposition(unsigned long eventId, const char *compositionIdentifier, unsigned long chainIndex, const char *subcompositionIdentifier);
146 void returnThreadsForTriggerWorker(unsigned long eventId);
147 void returnThreadsForChainWorker(unsigned long eventId, const char *compositionIdentifier, unsigned long chainIndex);
148};
149
150extern "C"
151{
152void vuoScheduleTriggerWorker(VuoCompositionState *compositionState, dispatch_queue_t queue, void *context, void (*function)(void *),
153 int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, int chainCount);
154void vuoScheduleChainWorker(VuoCompositionState *compositionState, dispatch_queue_t queue, void *context, void (*function)(void *),
155 int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, unsigned long chainIndex,
156 unsigned long *upstreamChainIndices, int upstreamChainIndicesCount);
157void vuoGrantThreadsToChain(VuoCompositionState *compositionState,
158 int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, unsigned long chainIndex);
159void vuoGrantThreadsToSubcomposition(VuoCompositionState *compositionState, unsigned long eventId, unsigned long chainIndex,
160 char *subcompositionIdentifier);
161void vuoReturnThreadsForTriggerWorker(VuoCompositionState *compositionState, unsigned long eventId);
162void vuoReturnThreadsForChainWorker(VuoCompositionState *compositionState, unsigned long eventId, unsigned long chainIndex);
163}