Vuo  2.0.0
VuoThreadManager.hh
Go to the documentation of this file.
1 
10 #pragma once
11 
12 #include <dispatch/dispatch.h>
13 #include <map>
14 #include <set>
15 #include <vector>
16 using namespace std;
17 
18 #include "VuoCompositionState.h"
19 #include "VuoHeap.h"
20 
30 {
31 private:
35  class Worker
36  {
37  public:
38  dispatch_queue_t queue;
39  void *context;
40  void (*function)(void *);
41 
42  bool isTrigger;
43 
44  int minThreadsNeeded;
45  int maxThreadsNeeded;
46 
47  unsigned long eventId;
48  unsigned long compositionHash;
49 
50  int chainCount;
51 
52  unsigned long chainIndex;
53  unsigned long *upstreamChainIndices;
54  int upstreamChainIndicesCount;
55 
56  Worker(dispatch_queue_t queue, void *context, void (*function)(void *), int minThreadsNeeded, int maxThreadsNeeded,
57  unsigned long eventId, unsigned long compositionHash, int chainCount);
58  Worker(dispatch_queue_t queue, void *context, void (*function)(void *), int minThreadsNeeded, int maxThreadsNeeded,
59  unsigned long eventId, unsigned long compositionHash, unsigned long chainIndex,
60  unsigned long *upstreamChainIndices, int upstreamChainIndicesCount);
61  };
62 
70  class WorkerQueue
71  {
72  public:
76  struct Node
77  {
78  Worker *worker;
81  };
82 
83  Node *first;
84  Node *last;
85 
86  WorkerQueue(void);
87  void enqueue(Worker *worker);
88  void dequeue(Node *node);
89  Node * getNextOldest(Node *newest, Node *node);
90  Node * getOldest(void);
91  Node * getNewest(void);
92  size_t size(void);
93  };
94 
100  class ThreadPool
101  {
102  public:
103  int totalThreads;
104  int threadsAvailable;
105  map<unsigned long, int> workersClaimingThreads;
106 
107  int totalWorkers;
108  set<unsigned long> workersCompleted;
109 
110  ThreadPool(void);
111  void setTotalThreads(int totalThreads);
112  bool tryClaimThreads(int minThreadsNeeded, int maxThreadsNeeded, unsigned long workerId, int &threadsClaimed) VuoWarnUnusedResult;
113  int getThreadsClaimed(unsigned long workerId);
114  void returnThreads(unsigned long workerId);
115  };
116 
117  WorkerQueue workersWaitingForThreads;
118  ThreadPool mainThreadPool;
119  map<unsigned long, map<unsigned long, ThreadPool> > triggerThreadPools;
120  dispatch_queue_t workersWaitingSync;
121  dispatch_queue_t threadPoolSync;
122  dispatch_semaphore_t workersUpdated;
123  dispatch_semaphore_t completed;
124  bool mayMoreWorkersBeEnqueued;
125  bool mayMoreWorkersBeDequeued;
126 
127  vector<Worker *> workersDequeued;
128  vector<WorkerQueue::Node *> workersDequeuedNodes;
129 
130  vector<Worker *> dequeueWorkers(void);
131 
132 public:
133  VuoThreadManager(void);
134  ~VuoThreadManager(void);
135  void enableSchedulingWorkers(void);
136  void disableSchedulingWorkers(void);
137  void scheduleTriggerWorker(dispatch_queue_t queue, void *context, void (*function)(void *),
138  int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, const char *compositionIdentifier,
139  int chainCount);
140  void scheduleChainWorker(dispatch_queue_t queue, void *context, void (*function)(void *),
141  int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, const char *compositionIdentifier,
142  unsigned long chainIndex, unsigned long *upstreamChainIndices, int upstreamChainIndicesCount);
143  void grantThreadsToChain(int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, const char *compositionIdentifier,
144  unsigned long chainIndex);
145  void grantThreadsToSubcomposition(unsigned long eventId, const char *compositionIdentifier, unsigned long chainIndex, const char *subcompositionIdentifier);
146  void returnThreadsForTriggerWorker(unsigned long eventId);
147  void returnThreadsForChainWorker(unsigned long eventId, const char *compositionIdentifier, unsigned long chainIndex);
148 };
149 
150 extern "C"
151 {
152 void vuoScheduleTriggerWorker(VuoCompositionState *compositionState, dispatch_queue_t queue, void *context, void (*function)(void *),
153  int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, int chainCount);
154 void vuoScheduleChainWorker(VuoCompositionState *compositionState, dispatch_queue_t queue, void *context, void (*function)(void *),
155  int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, unsigned long chainIndex,
156  unsigned long *upstreamChainIndices, int upstreamChainIndicesCount);
157 void vuoGrantThreadsToChain(VuoCompositionState *compositionState,
158  int minThreadsNeeded, int maxThreadsNeeded, unsigned long eventId, unsigned long chainIndex);
159 void vuoGrantThreadsToSubcomposition(VuoCompositionState *compositionState, unsigned long eventId, unsigned long chainIndex,
160  char *subcompositionIdentifier);
161 void vuoReturnThreadsForTriggerWorker(VuoCompositionState *compositionState, unsigned long eventId);
162 void vuoReturnThreadsForChainWorker(VuoCompositionState *compositionState, unsigned long eventId, unsigned long chainIndex);
163 }