Vuo  2.3.2
VuoRuntimeCommunicator.cc
Go to the documentation of this file.
1 
11 
12 #include <dlfcn.h>
13 #include <sstream>
14 #include "VuoEventLoop.h"
15 #include "VuoException.hh"
16 #include "VuoHeap.h"
17 #include "VuoNodeRegistry.hh"
19 #include "VuoRuntimeState.hh"
20 
25 {
26  this->persistentState = persistentState;
27 
28  _hasZmqConnection = false;
29 
30  zmqContext = NULL;
31  zmqControl = NULL;
32  zmqSelfReceive = NULL;
33  zmqSelfSend = NULL;
34  zmqTelemetry = NULL;
35 
36  controlQueue = dispatch_queue_create("org.vuo.runtime.control", NULL);
37  telemetryQueue = dispatch_queue_create("org.vuo.runtime.telemetry", NULL);
38  controlTimer = NULL;
39  telemetryTimer = NULL;
40  controlCanceled = dispatch_semaphore_create(0);
41  telemetryCanceled = dispatch_semaphore_create(0);
42 
43  runnerPipe = -1;
44 
45  vuoInstanceInit = NULL;
46  vuoInstanceTriggerStart = NULL;
47  vuoInstanceTriggerStop = NULL;
48  vuoSetInputPortValue = NULL;
49  getPublishedInputPortCount = NULL;
50  getPublishedOutputPortCount = NULL;
51  getPublishedInputPortNames = NULL;
52  getPublishedOutputPortNames = NULL;
53  getPublishedInputPortTypes = NULL;
54  getPublishedOutputPortTypes = NULL;
55  getPublishedInputPortDetails = NULL;
56  getPublishedOutputPortDetails = NULL;
57  firePublishedInputPortEvent = NULL;
58  setPublishedInputPortValue = NULL;
59  getPublishedInputPortValue = NULL;
60  getPublishedOutputPortValue = NULL;
61 }
62 
67 {
68  dispatch_release(controlQueue);
69  dispatch_release(telemetryQueue);
70  dispatch_release(controlCanceled);
71  dispatch_release(telemetryCanceled);
72 }
73 
79 void VuoRuntimeCommunicator::updateCompositionSymbols(void *compositionBinaryHandle)
80 {
81  ostringstream errorMessage;
82 
83  vuoInstanceInit = (vuoInstanceInitType) dlsym(compositionBinaryHandle, "vuoInstanceInit");
84  if (! vuoInstanceInit)
85  {
86  errorMessage << "The composition couldn't be started because its vuoInstanceInit() function couldn't be found : " << dlerror();
87  throw VuoException(errorMessage.str());
88  }
89 
90  vuoInstanceTriggerStart = (vuoInstanceTriggerStartType) dlsym(compositionBinaryHandle, "vuoInstanceTriggerStart");
91  if (! vuoInstanceTriggerStart)
92  {
93  errorMessage << "The composition couldn't be started because its vuoInstanceTriggerStart() function couldn't be found : " << dlerror();
94  throw VuoException(errorMessage.str());
95  }
96 
97  vuoInstanceTriggerStop = (vuoInstanceTriggerStopType) dlsym(compositionBinaryHandle, "vuoInstanceTriggerStop");
98  if (! vuoInstanceTriggerStop)
99  {
100  errorMessage << "The composition couldn't be started because its vuoInstanceTriggerStop() function couldn't be found : " << dlerror();
101  throw VuoException(errorMessage.str());
102  }
103 
104  vuoSetInputPortValue = (vuoSetInputPortValueType) dlsym(compositionBinaryHandle, "vuoSetInputPortValue");
105  if (! vuoSetInputPortValue)
106  {
107  errorMessage << "The composition couldn't be started because its vuoSetInputPortValue() function couldn't be found : " << dlerror();
108  throw VuoException(errorMessage.str());
109  }
110 
111  getPublishedInputPortCount = (getPublishedInputPortCountType) dlsym(compositionBinaryHandle, "getPublishedInputPortCount");
112  if (! getPublishedInputPortCount)
113  {
114  errorMessage << "The composition couldn't be started because its getPublishedInputPortCount() function couldn't be found : " << dlerror();
115  throw VuoException(errorMessage.str());
116  }
117 
118  getPublishedOutputPortCount = (getPublishedOutputPortCountType) dlsym(compositionBinaryHandle, "getPublishedOutputPortCount");
119  if (! getPublishedOutputPortCount)
120  {
121  errorMessage << "The composition couldn't be started because its getPublishedOutputPortCount() function couldn't be found : " << dlerror();
122  throw VuoException(errorMessage.str());
123  }
124 
125  getPublishedInputPortNames = (getPublishedInputPortNamesType) dlsym(compositionBinaryHandle, "getPublishedInputPortNames");
126  if (! getPublishedInputPortNames)
127  {
128  errorMessage << "The composition couldn't be started because its getPublishedInputPortNames() function couldn't be found : " << dlerror();
129  throw VuoException(errorMessage.str());
130  }
131 
132  getPublishedOutputPortNames = (getPublishedOutputPortNamesType) dlsym(compositionBinaryHandle, "getPublishedOutputPortNames");
133  if (! getPublishedOutputPortNames)
134  {
135  errorMessage << "The composition couldn't be started because its getPublishedOutputPortNames() function couldn't be found : " << dlerror();
136  throw VuoException(errorMessage.str());
137  }
138 
139  getPublishedInputPortTypes = (getPublishedInputPortTypesType) dlsym(compositionBinaryHandle, "getPublishedInputPortTypes");
140  if (! getPublishedInputPortTypes)
141  {
142  errorMessage << "The composition couldn't be started because its getPublishedInputPortTypes() function couldn't be found : " << dlerror();
143  throw VuoException(errorMessage.str());
144  }
145 
146  getPublishedOutputPortTypes = (getPublishedOutputPortTypesType) dlsym(compositionBinaryHandle, "getPublishedOutputPortTypes");
147  if (! getPublishedOutputPortTypes)
148  {
149  errorMessage << "The composition couldn't be started because its getPublishedOutputPortTypes() function couldn't be found : " << dlerror();
150  throw VuoException(errorMessage.str());
151  }
152 
153  getPublishedInputPortDetails = (getPublishedInputPortDetailsType) dlsym(compositionBinaryHandle, "getPublishedInputPortDetails");
154  if (! getPublishedInputPortDetails)
155  {
156  errorMessage << "The composition couldn't be started because its getPublishedInputPortDetails() function couldn't be found : " << dlerror();
157  throw VuoException(errorMessage.str());
158  }
159 
160  getPublishedOutputPortDetails = (getPublishedOutputPortDetailsType) dlsym(compositionBinaryHandle, "getPublishedOutputPortDetails");
161  if (! getPublishedOutputPortDetails)
162  {
163  errorMessage << "The composition couldn't be started because its getPublishedOutputPortDetails() function couldn't be found : " << dlerror();
164  throw VuoException(errorMessage.str());
165  }
166 
167  firePublishedInputPortEvent = (firePublishedInputPortEventType) dlsym(compositionBinaryHandle, "firePublishedInputPortEvent");
168  if (! firePublishedInputPortEvent)
169  {
170  errorMessage << "The composition couldn't be started because its firePublishedInputPortEvent() function couldn't be found : " << dlerror();
171  throw VuoException(errorMessage.str());
172  }
173 
174  setPublishedInputPortValue = (setPublishedInputPortValueType) dlsym(compositionBinaryHandle, "setPublishedInputPortValue");
175  if (! setPublishedInputPortValue)
176  {
177  errorMessage << "The composition couldn't be started because its setPublishedInputPortValue() function couldn't be found : " << dlerror();
178  throw VuoException(errorMessage.str());
179  }
180 
181  getPublishedInputPortValue = (getPublishedInputPortValueType) dlsym(compositionBinaryHandle, "getPublishedInputPortValue");
182  if (! getPublishedInputPortValue)
183  {
184  errorMessage << "The composition couldn't be started because its getPublishedInputPortValue() function couldn't be found : " << dlerror();
185  throw VuoException(errorMessage.str());
186  }
187 
188  getPublishedOutputPortValue = (getPublishedOutputPortValueType) dlsym(compositionBinaryHandle, "getPublishedOutputPortValue");
189  if (! getPublishedOutputPortValue)
190  {
191  errorMessage << "The composition couldn't be started because its getPublishedOutputPortValue() function couldn't be found : " << dlerror();
192  throw VuoException(errorMessage.str());
193  }
194 }
195 
201 void VuoRuntimeCommunicator::openConnection(void *_zmqContext, const char *controlURL, const char *telemetryURL, int runnerPipe)
202 {
203  this->runnerPipe = runnerPipe;
204 
205  if (controlURL && telemetryURL)
206  {
207  _hasZmqConnection = true;
208  zmqContext = (_zmqContext ? _zmqContext : zmq_init(1));
209 
210  zmqControl = zmq_socket(zmqContext,ZMQ_REP);
211  zmqTelemetry = zmq_socket(zmqContext,ZMQ_PUB);
212 
213  bool error = false;
214  ostringstream errorMessage;
215 
216  if(zmq_bind(zmqControl,controlURL))
217  {
218  errorMessage << "The composition couldn't start because it couldn't establish communication to be controlled by the runner : " << zmq_strerror(errno) << endl;
219  error = true;
220  }
221 
222  zmqSelfReceive = zmq_socket(zmqContext, ZMQ_PAIR);
223  if (zmq_bind(zmqSelfReceive, "inproc://vuo-runtime-self") != 0)
224  {
225  errorMessage << "Couldn't bind self-receive socket: " << zmq_strerror(errno) << " " << errno << endl;
226  error = true;
227  }
228 
229  zmqSelfSend = zmq_socket(zmqContext, ZMQ_PAIR);
230  if (zmq_connect(zmqSelfSend, "inproc://vuo-runtime-self") != 0)
231  {
232  errorMessage << "Couldn't connect self-send socket: " << zmq_strerror(errno) << " " << errno << endl;
233  error = true;
234  }
235 
236  const int highWaterMark = 0; // no limit
237  if(zmq_setsockopt(zmqTelemetry,ZMQ_SNDHWM,&highWaterMark,sizeof(highWaterMark)))
238  {
239  errorMessage << "Couldn't set high-water mark on telemetry socket: " << zmq_strerror(errno) << " " << errno << endl;
240  error = true;
241  }
242 
243  if(zmq_bind(zmqTelemetry,telemetryURL))
244  {
245  errorMessage << "The composition couldn't start because it couldn't establish communication to be listened to by the runner : " << zmq_strerror(errno) << endl;
246  error = true;
247  }
248 
249  if (error)
250  {
251  zmq_close(zmqControl);
252  zmqControl = NULL;
253  zmq_close(zmqSelfSend);
254  zmqSelfSend = NULL;
255  zmq_close(zmqSelfReceive);
256  zmqSelfReceive = NULL;
257  zmq_close(zmqTelemetry);
258  zmqTelemetry = NULL;
259  throw VuoException(errorMessage.str());
260  }
261  }
262 }
263 
268 {
269  if (! _hasZmqConnection)
270  return;
271 
272  zmq_term(zmqContext);
273  zmqContext = NULL;
274 
275  _hasZmqConnection = false;
276 }
277 
282 {
283  return _hasZmqConnection;
284 }
285 
290 {
291  return controlQueue;
292 }
293 
297 void VuoRuntimeCommunicator::sendControlReply(enum VuoControlReply reply, zmq_msg_t *messages, unsigned int messageCount)
298 {
299  vuoSend("VuoControl",zmqControl,reply,messages,messageCount,false,NULL);
300 }
301 
305 void VuoRuntimeCommunicator::sendTelemetry(enum VuoTelemetry type, zmq_msg_t *messages, unsigned int messageCount)
306 {
308  if (!zmqTelemetry)
309  return;
310 
311  dispatch_sync(telemetryQueue, ^{
313  vuoSend("VuoTelemetry",zmqTelemetry,type,messages,messageCount,true,NULL);
314  });
315 }
316 
322 void VuoRuntimeCommunicator::sendNodeExecutionStarted(const char *compositionIdentifier, const char *nodeIdentifier)
323 {
324  if (! (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToEventTelemetry(compositionIdentifier)))
325  return;
326 
327  zmq_msg_t messages[2];
328  vuoInitMessageWithString(&messages[0], compositionIdentifier);
329  vuoInitMessageWithString(&messages[1], nodeIdentifier);
330 
331  sendTelemetry(VuoTelemetryNodeExecutionStarted, messages, 2);
332 }
333 
339 void VuoRuntimeCommunicator::sendNodeExecutionFinished(const char *compositionIdentifier, const char *nodeIdentifier)
340 {
341  if (! (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToEventTelemetry(compositionIdentifier)))
342  return;
343 
344  zmq_msg_t messages[2];
345  vuoInitMessageWithString(&messages[0], compositionIdentifier);
346  vuoInitMessageWithString(&messages[1], nodeIdentifier);
347 
348  sendTelemetry(VuoTelemetryNodeExecutionFinished, messages, 2);
349 }
350 
356 void VuoRuntimeCommunicator::sendInputPortsUpdated(const char *compositionIdentifier, const char *portIdentifier, bool receivedEvent, bool receivedData, const char *portDataSummary)
357 {
358  bool isSendingAllTelemetry = isSubscribedToAllTelemetry(compositionIdentifier);
359  bool isSendingPortTelemetry = isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier);
360  if (! (isSendingAllTelemetry || isSubscribedToEventTelemetry(compositionIdentifier) || isSendingPortTelemetry))
361  return;
362 
363  zmq_msg_t messages[5];
364  vuoInitMessageWithString(&messages[0], compositionIdentifier);
365  vuoInitMessageWithString(&messages[1], portIdentifier);
366  vuoInitMessageWithBool(&messages[2], receivedEvent);
367  vuoInitMessageWithBool(&messages[3], receivedData);
368  vuoInitMessageWithString(&messages[4], (portDataSummary && (isSendingAllTelemetry || isSendingPortTelemetry)) ? portDataSummary : "");
369 
370  sendTelemetry(VuoTelemetryInputPortsUpdated, messages, 5);
371 }
372 
378 void VuoRuntimeCommunicator::sendOutputPortsUpdated(const char *compositionIdentifier, const char *portIdentifier, bool sentEvent, bool sentData, const char *portDataSummary)
379 {
380  bool isSendingAllTelemetry = isSubscribedToAllTelemetry(compositionIdentifier);
381  bool isSendingPortTelemetry = isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier);
382  if (! (isSendingAllTelemetry || isSubscribedToEventTelemetry(compositionIdentifier) || isSendingPortTelemetry))
383  return;
384 
385  zmq_msg_t messages[5];
386  vuoInitMessageWithString(&messages[0], compositionIdentifier);
387  vuoInitMessageWithString(&messages[1], portIdentifier);
388  vuoInitMessageWithBool(&messages[2], sentEvent);
389  vuoInitMessageWithBool(&messages[3], sentData);
390  vuoInitMessageWithString(&messages[4], (portDataSummary && (isSendingAllTelemetry || isSendingPortTelemetry)) ? portDataSummary : "");
391 
392  sendTelemetry(VuoTelemetryOutputPortsUpdated, messages, 5);
393 }
394 
398 void VuoRuntimeCommunicator::sendPublishedOutputPortsUpdated(const char *portIdentifier, bool sentData, const char *portDataSummary)
399 {
400  zmq_msg_t messages[3];
401  vuoInitMessageWithString(&messages[0], portIdentifier);
402  vuoInitMessageWithBool(&messages[1], sentData);
403  vuoInitMessageWithString(&messages[2], (portDataSummary ? portDataSummary : ""));
404 
405  sendTelemetry(VuoTelemetryPublishedOutputPortsUpdated, messages, 3);
406 }
407 
413 void VuoRuntimeCommunicator::sendEventFinished(unsigned long eventId, NodeContext *compositionContext)
414 {
415  if (! vuoFinishedExecutingEvent(compositionContext, eventId))
416  return;
417 
418  sendTelemetry(VuoTelemetryEventFinished, NULL, 0);
419 }
420 
426 void VuoRuntimeCommunicator::sendEventDropped(const char *compositionIdentifier, const char *portIdentifier)
427 {
428  bool isSendingPortTelemetry = isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier);
429  if (! (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToEventTelemetry(compositionIdentifier) || isSendingPortTelemetry))
430  return;
431 
432  zmq_msg_t messages[2];
433  vuoInitMessageWithString(&messages[0], compositionIdentifier);
434  vuoInitMessageWithString(&messages[1], portIdentifier);
435 
436  sendTelemetry(VuoTelemetryEventDropped, messages, 2);
437 }
438 
442 void VuoRuntimeCommunicator::sendError(const char *message)
443 {
444  if (! _hasZmqConnection)
445  return;
446 
447  zmq_msg_t messages[1];
448  vuoInitMessageWithString(&messages[0], message);
449 
450  sendTelemetry(VuoTelemetryError, messages, 1);
451 }
452 
457 {
458  sendTelemetry(VuoTelemetryStopRequested, NULL, 0);
459 }
460 
466 {
467  sendControlReply(VuoControlReplyCompositionStopping, NULL, 0);
468  zmq_close(zmqControl); // wait until message fully sends
469  zmqControl = NULL;
470 }
471 
475 void VuoRuntimeCommunicator::subscribeToPortDataTelemetry(const char *compositionIdentifier, const char *portIdentifer)
476 {
477  portsSendingDataTelemetry[compositionIdentifier].insert(portIdentifer);
478 }
479 
483 void VuoRuntimeCommunicator::unsubscribeFromPortDataTelemetry(const char *compositionIdentifier, const char *portIdentifer)
484 {
485  map<string, set<string> >::iterator iter1 = portsSendingDataTelemetry.find(compositionIdentifier);
486  if (iter1 != portsSendingDataTelemetry.end())
487  {
488  set<string>::iterator iter2 = iter1->second.find(portIdentifer);
489  if (iter2 != iter1->second.end())
490  {
491  iter1->second.erase(iter2);
492  if (iter1->second.empty())
493  portsSendingDataTelemetry.erase(iter1);
494  }
495  }
496 }
497 
501 bool VuoRuntimeCommunicator::isSubscribedToPortDataTelemetry(const char *compositionIdentifier, const char *portIdentifer)
502 {
503  map<string, set<string> >::iterator iter = portsSendingDataTelemetry.find(compositionIdentifier);
504  if (iter != portsSendingDataTelemetry.end())
505  return (iter->second.find(portIdentifer) != iter->second.end());
506 
507  return false;
508 }
509 
513 void VuoRuntimeCommunicator::subscribeToEventTelemetry(const char *compositionIdentifier)
514 {
515  compositionsSendingEventTelemetry.insert(compositionIdentifier);
516 }
517 
521 void VuoRuntimeCommunicator::unsubscribeFromEventTelemetry(const char *compositionIdentifier)
522 {
523  set<string>::iterator iter = compositionsSendingEventTelemetry.find(compositionIdentifier);
524  if (iter != compositionsSendingEventTelemetry.end())
525  compositionsSendingEventTelemetry.erase(iter);
526 }
527 
531 bool VuoRuntimeCommunicator::isSubscribedToEventTelemetry(const char *compositionIdentifier)
532 {
533  set<string>::iterator iter = compositionsSendingEventTelemetry.find(compositionIdentifier);
534  return (iter != compositionsSendingEventTelemetry.end());
535 }
536 
540 void VuoRuntimeCommunicator::subscribeToAllTelemetry(const char *compositionIdentifier)
541 {
542  compositionsSendingAllTelemetry.insert(compositionIdentifier);
543 }
544 
548 void VuoRuntimeCommunicator::unsubscribeFromAllTelemetry(const char *compositionIdentifier)
549 {
550  set<string>::iterator iter = compositionsSendingAllTelemetry.find(compositionIdentifier);
551  if (iter != compositionsSendingAllTelemetry.end())
552  compositionsSendingAllTelemetry.erase(iter);
553 }
554 
558 bool VuoRuntimeCommunicator::isSubscribedToAllTelemetry(const char *compositionIdentifier)
559 {
560  set<string>::iterator iter = compositionsSendingAllTelemetry.find(compositionIdentifier);
561  return (iter != compositionsSendingAllTelemetry.end());
562 }
563 
569 bool VuoRuntimeCommunicator::shouldSendPortDataTelemetry(const char *compositionIdentifier, const char *portIdentifier)
570 {
571  return (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier));
572 }
573 
579 char * VuoRuntimeCommunicator::mergeEnumDetails(string type, const char *details)
580 {
581  string allowedValuesFunctionName = type + "_getAllowedValues";
582  typedef void *(*allowedValuesFunctionType)(void);
583  allowedValuesFunctionType allowedValuesFunction = (allowedValuesFunctionType)dlsym(RTLD_SELF, allowedValuesFunctionName.c_str());
584  if (!allowedValuesFunction)
585  return NULL;
586 
587  string getJsonFunctionName = type + "_getJson";
588  typedef json_object *(*getJsonFunctionType)(int64_t);
589  getJsonFunctionType getJsonFunction = (getJsonFunctionType)dlsym(RTLD_SELF, getJsonFunctionName.c_str());
590  if (!getJsonFunction)
591  return NULL;
592 
593  string summaryFunctionName = type + "_getSummary";
594  typedef char *(*summaryFunctionType)(int64_t);
595  summaryFunctionType summaryFunction = (summaryFunctionType)dlsym(RTLD_SELF, summaryFunctionName.c_str());
596  if (!summaryFunction)
597  return NULL;
598 
599  string listCountFunctionName = "VuoListGetCount_" + type;
600  typedef unsigned long (*listCountFunctionType)(void *);
601  listCountFunctionType listCountFunction = (listCountFunctionType)dlsym(RTLD_SELF, listCountFunctionName.c_str());
602  if (!listCountFunction)
603  return NULL;
604 
605  string listValueFunctionName = "VuoListGetValue_" + type;
606  typedef int64_t (*listValueFunctionType)(void *, unsigned long);
607  listValueFunctionType listValueFunction = (listValueFunctionType)dlsym(RTLD_SELF, listValueFunctionName.c_str());
608  if (!listValueFunction)
609  return NULL;
610 
611  json_object *detailsJson = json_tokener_parse(details);
612  if (!detailsJson)
613  return NULL;
614 
615  void *allowedValues = allowedValuesFunction();
616  VuoRetain(allowedValues);
617  unsigned long listCount = listCountFunction(allowedValues);
618  json_object *menuItems = json_object_new_array();
619  for (unsigned long i = 1; i <= listCount; ++i)
620  {
621  int64_t value = listValueFunction(allowedValues, i);
622  json_object *js = getJsonFunction(value);
623  if (!json_object_is_type(js, json_type_string))
624  continue;
625  const char *key = json_object_get_string(js);
626  char *summary = summaryFunction(value);
627 
628  json_object *menuItem = json_object_new_object();
629  json_object_object_add(menuItem, "value", json_object_new_string(key));
630  json_object_object_add(menuItem, "name", json_object_new_string(summary));
631  json_object_array_add(menuItems, menuItem);
632 
633  free(summary);
634  }
635  VuoRelease(allowedValues);
636 
637  if (json_object_array_length(menuItems))
638  json_object_object_add(detailsJson, "menuItems", menuItems);
639 
640  char *newDetails = strdup(json_object_to_json_string(detailsJson));
641  json_object_put(detailsJson);
642 
643  return newDetails;
644 }
645 
646 void VuoRuntimeCommunicator::sendHeartbeat(bool blocking)
647 {
648  struct rusage r;
649  if(getrusage(RUSAGE_SELF,&r))
650  {
651  VUserLog("The composition couldn't get the information to send for VuoTelemetryStats : %s", strerror(errno));
652  return;
653  }
654 
655  zmq_msg_t messages[2];
656 
657  {
658  uint64_t utime = r.ru_utime.tv_sec*USEC_PER_SEC+r.ru_utime.tv_usec;
659  zmq_msg_init_size(&messages[0], sizeof utime);
660  memcpy(zmq_msg_data(&messages[0]), &utime, sizeof utime);
661  }
662 
663  {
664  uint64_t stime = r.ru_stime.tv_sec*USEC_PER_SEC+r.ru_stime.tv_usec;
665  zmq_msg_init_size(&messages[1], sizeof stime);
666  memcpy(zmq_msg_data(&messages[1]), &stime, sizeof stime);
667  }
668 
669  if (blocking)
670  // When called with blocking=true, we're already on telemetryQueue.
671  vuoSend("VuoTelemetry", zmqTelemetry, VuoTelemetryStats, messages, 2, false, NULL);
672  else
673  sendTelemetry(VuoTelemetryStats, messages, 2);
674 }
675 
680 {
681  if (! zmqTelemetry)
682  return;
683 
684  dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0);
685  telemetryTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, queue);
686  dispatch_source_set_timer(telemetryTimer, dispatch_walltime(NULL, 0), NSEC_PER_SEC/1000, NSEC_PER_SEC/1000);
687 
688  dispatch_source_set_event_handler(telemetryTimer, ^{
689  sendHeartbeat();
690  });
691 
692  dispatch_source_set_cancel_handler(telemetryTimer, ^{
693  dispatch_semaphore_signal(telemetryCanceled);
694  });
695 
696  dispatch_resume(telemetryTimer);
697 }
698 
703 {
704  if (! zmqTelemetry)
705  return;
706 
707  dispatch_source_cancel(telemetryTimer);
708  dispatch_semaphore_wait(telemetryCanceled, DISPATCH_TIME_FOREVER);
709  dispatch_sync(telemetryQueue, ^{
710  // zmq_close calls POSIX close(), whose documentation says "queued data are discarded".
711  // Since telemetry uses non-blocking sends, issue one final _blocking_ send
712  // to ensure that the queue is drained before we close.
713  sendHeartbeat(true);
714 
715  zmq_close(zmqTelemetry);
716  zmqTelemetry = NULL;
717  });
718 
719  dispatch_release(telemetryTimer);
720  telemetryTimer = NULL;
721 }
722 
727 {
728  if (! zmqControl)
729  return;
730 
731  controlTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, controlQueue);
732  dispatch_source_set_timer(controlTimer, dispatch_walltime(NULL,0), NSEC_PER_SEC/1000, NSEC_PER_SEC/1000);
733 
734  dispatch_source_set_event_handler(controlTimer, ^{
735 
737 
738  zmq_pollitem_t items[]=
739  {
740  {zmqControl,0,ZMQ_POLLIN,0},
741  {zmqSelfReceive,0,ZMQ_POLLIN,0},
742  };
743  int itemCount = 2;
744  long timeout = -1; // Wait forever (VuoStopComposition will send a message ZMQSelfReceive when it's time to stop).
745  zmq_poll(items,itemCount,timeout);
746  if(!(items[0].revents & ZMQ_POLLIN))
747  return;
748 
749  enum VuoControlRequest control = (enum VuoControlRequest) vuoReceiveInt(zmqControl, NULL);
750  VuoCompositionState compositionState = { (void *)persistentState->runtimeState, "" };
751 
752  switch (control)
753  {
755  {
756  dispatch_source_set_timer(telemetryTimer, dispatch_walltime(NULL, 0), NSEC_PER_SEC/2, NSEC_PER_SEC/100);
757 
758  sendControlReply(VuoControlReplyHeartbeatSlowed,NULL,0);
759  break;
760  }
762  {
763  int timeoutInSeconds = vuoReceiveInt(zmqControl, NULL);
764  bool isBeingReplaced = vuoReceiveBool(zmqControl, NULL);
765  bool isLastEverInProcess = vuoReceiveBool(zmqControl, NULL);
766 
767  persistentState->runtimeState->stopCompositionAsOrderedByRunner(isBeingReplaced, timeoutInSeconds, isLastEverInProcess);
768 
769  sendControlReply(VuoControlReplyCompositionStopping,NULL,0);
770  break;
771  }
773  {
774  persistentState->runtimeState->pauseComposition();
775 
776  sendControlReply(VuoControlReplyCompositionPaused,NULL,0);
777  break;
778  }
780  {
781  persistentState->runtimeState->unpauseComposition();
782 
783  sendControlReply(VuoControlReplyCompositionUnpaused,NULL,0);
784  break;
785  }
788  {
789  bool shouldUseInterprocessSerialization = vuoReceiveBool(zmqControl, NULL);
790  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
791  char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
792 
793  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
794  char *valueAsString = persistentState->nodeRegistry->getPortValue(&compositionState, portIdentifier, shouldUseInterprocessSerialization);
795 
796  zmq_msg_t messages[1];
797  vuoInitMessageWithString(&messages[0], valueAsString);
798  sendControlReply(control == VuoControlRequestInputPortValueRetrieve ?
800  messages,1);
801 
802  free(compositionIdentifier);
803  free(portIdentifier);
804  free(valueAsString);
805  break;
806  }
809  {
810  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
811  char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
812 
813  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
814  char *summary = persistentState->nodeRegistry->getPortSummary(&compositionState, portIdentifier);
815 
816  zmq_msg_t messages[1];
817  vuoInitMessageWithString(&messages[0], summary);
818  sendControlReply(control == VuoControlRequestInputPortSummaryRetrieve ?
820  messages,1);
821 
822  free(compositionIdentifier);
823  free(portIdentifier);
824  free(summary);
825  break;
826  }
828  {
829  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
830  char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
831 
832  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
833  persistentState->nodeRegistry->fireTriggerPortEvent(&compositionState, portIdentifier);
834 
835  sendControlReply(VuoControlReplyTriggerPortFiredEvent,NULL,0);
836 
837  free(compositionIdentifier);
838  free(portIdentifier);
839  break;
840  }
842  {
843  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
844  char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
845  char *valueAsString = vuoReceiveAndCopyString(zmqControl, NULL);
846 
847  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
848  persistentState->nodeRegistry->setPortValue(&compositionState, portIdentifier, valueAsString);
849 
850  sendControlReply(VuoControlReplyInputPortValueModified,NULL,0);
851 
852  free(compositionIdentifier);
853  free(portIdentifier);
854  free(valueAsString);
855  break;
856  }
858  {
859  int count = getPublishedInputPortCount();
860  char **names = getPublishedInputPortNames();
861 
862  zmq_msg_t messages[count];
863  for (int i = 0; i < count; ++i)
864  vuoInitMessageWithString(&messages[i], names[i]);
865 
866  sendControlReply(VuoControlReplyPublishedInputPortNamesRetrieved,messages,count);
867  break;
868  }
870  {
871  int count = getPublishedOutputPortCount();
872  char **names = getPublishedOutputPortNames();
873 
874  zmq_msg_t messages[count];
875  for (int i = 0; i < count; ++i)
876  vuoInitMessageWithString(&messages[i], names[i]);
877 
878  sendControlReply(VuoControlReplyPublishedOutputPortNamesRetrieved,messages,count);
879  break;
880  }
882  {
883  int count = getPublishedInputPortCount();
884  char **names = getPublishedInputPortTypes();
885 
886  zmq_msg_t messages[count];
887  for (int i = 0; i < count; ++i)
888  vuoInitMessageWithString(&messages[i], names[i]);
889 
890  sendControlReply(VuoControlReplyPublishedInputPortTypesRetrieved,messages,count);
891  break;
892  }
894  {
895  int count = getPublishedOutputPortCount();
896  char **names = getPublishedOutputPortTypes();
897 
898  zmq_msg_t messages[count];
899  for (int i = 0; i < count; ++i)
900  vuoInitMessageWithString(&messages[i], names[i]);
901 
902  sendControlReply(VuoControlReplyPublishedOutputPortTypesRetrieved,messages,count);
903  break;
904  }
906  {
907  int count = getPublishedInputPortCount();
908  char **types = getPublishedInputPortTypes();
909  char **names = getPublishedInputPortDetails();
910 
911  zmq_msg_t messages[count];
912  for (int i = 0; i < count; ++i)
913  {
914  char *newDetails = mergeEnumDetails(types[i], names[i]);
915  if (newDetails)
916  names[i] = newDetails;
917  vuoInitMessageWithString(&messages[i], names[i]);
918  if (newDetails)
919  free(newDetails);
920  }
921 
922  sendControlReply(VuoControlReplyPublishedInputPortDetailsRetrieved,messages,count);
923  break;
924  }
926  {
927  int count = getPublishedOutputPortCount();
928  char **names = getPublishedOutputPortDetails();
929 
930  zmq_msg_t messages[count];
931  for (int i = 0; i < count; ++i)
932  vuoInitMessageWithString(&messages[i], names[i]);
933 
934  sendControlReply(VuoControlReplyPublishedOutputPortDetailsRetrieved,messages,count);
935  break;
936  }
938  {
939  int count = vuoReceiveInt(zmqControl, NULL);
940 
941  char **names = (char **)malloc(count * sizeof(char *));
942  for (int i = 0; i < count; ++i)
943  names[i] = vuoReceiveAndCopyString(zmqControl, NULL);
944 
945  firePublishedInputPortEvent(names, count);
946 
947  for (int i = 0; i < count; ++i)
948  free(names[i]);
949  free(names);
950 
951  sendControlReply(VuoControlReplyPublishedInputPortFiredEvent,NULL,0);
952  break;
953  }
955  {
957 
958  while (VuoTelemetry_hasMoreToReceive(zmqControl))
959  {
960  char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
961  char *valueAsString = vuoReceiveAndCopyString(zmqControl, NULL);
962 
963  setPublishedInputPortValue(portIdentifier, valueAsString);
964 
965  free(portIdentifier);
966  free(valueAsString);
967  }
968 
969  sendControlReply(VuoControlReplyPublishedInputPortValueModified,NULL,0);
970 
972  break;
973  }
975  {
977 
978  bool shouldUseInterprocessSerialization = vuoReceiveBool(zmqControl, NULL);
979  char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
980  char *valueAsString = getPublishedInputPortValue(portIdentifier, shouldUseInterprocessSerialization);
981  free(portIdentifier);
982  zmq_msg_t messages[1];
983  vuoInitMessageWithString(&messages[0], valueAsString);
984  free(valueAsString);
985  sendControlReply(VuoControlReplyPublishedInputPortValueRetrieved,messages,1);
986 
988  break;
989  }
991  {
993 
994  bool shouldUseInterprocessSerialization = vuoReceiveBool(zmqControl, NULL);
995  char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
996  char *valueAsString = getPublishedOutputPortValue(portIdentifier, shouldUseInterprocessSerialization);
997  free(portIdentifier);
998  zmq_msg_t messages[1];
999  vuoInitMessageWithString(&messages[0], valueAsString);
1000  free(valueAsString);
1001  sendControlReply(VuoControlReplyPublishedOutputPortValueRetrieved,messages,1);
1002 
1004  break;
1005  }
1008  {
1009  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1010  char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1011 
1012  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
1013  subscribeToPortDataTelemetry(compositionState.compositionIdentifier, portIdentifier);
1014 
1015  char *summary = persistentState->nodeRegistry->getPortSummary(&compositionState, portIdentifier);
1016 
1017  zmq_msg_t messages[1];
1018  vuoInitMessageWithString(&messages[0], summary);
1019  sendControlReply(control == VuoControlRequestInputPortTelemetrySubscribe ?
1021  messages,1);
1022 
1023  free(compositionIdentifier);
1024  free(portIdentifier);
1025  free(summary);
1026  break;
1027  }
1030  {
1031  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1032  char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1033 
1034  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
1035  unsubscribeFromPortDataTelemetry(compositionState.compositionIdentifier, portIdentifier);
1036 
1037  sendControlReply(control == VuoControlRequestInputPortTelemetryUnsubscribe ?
1039  NULL,0);
1040 
1041  free(compositionIdentifier);
1042  free(portIdentifier);
1043  break;
1044  }
1046  {
1047  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1048 
1049  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
1050  subscribeToEventTelemetry(compositionState.compositionIdentifier);
1051 
1052  sendControlReply(VuoControlReplyEventTelemetrySubscribed,NULL,0);
1053 
1054  free(compositionIdentifier);
1055  break;
1056  }
1058  {
1059  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1060 
1061  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
1062  unsubscribeFromEventTelemetry(compositionState.compositionIdentifier);
1063 
1064  sendControlReply(VuoControlReplyEventTelemetryUnsubscribed,NULL,0);
1065 
1066  free(compositionIdentifier);
1067  break;
1068  }
1070  {
1071  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1072 
1073  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
1074  subscribeToAllTelemetry(compositionState.compositionIdentifier);
1075 
1076  sendControlReply(VuoControlReplyAllTelemetrySubscribed,NULL,0);
1077 
1078  free(compositionIdentifier);
1079  break;
1080  }
1082  {
1083  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1084 
1085  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
1086  unsubscribeFromAllTelemetry(compositionState.compositionIdentifier);
1087 
1088  sendControlReply(VuoControlReplyAllTelemetryUnsubscribed,NULL,0);
1089 
1090  free(compositionIdentifier);
1091  break;
1092  }
1093  }
1094  });
1095 
1096  dispatch_source_set_cancel_handler(controlTimer, ^{
1097 
1098  persistentState->runtimeState->breakOutOfEventLoop();
1099  dispatch_semaphore_signal(controlCanceled);
1100 
1101  });
1102 
1103  dispatch_resume(controlTimer);
1104 }
1105 
1110 {
1111  if (! zmqControl)
1112  return;
1113 
1114  dispatch_source_cancel(controlTimer);
1115 }
1116 
1121 {
1122  if (! zmqSelfSend)
1123  return;
1124 
1125  char z = 0;
1126  zmq_msg_t message;
1127  zmq_msg_init_size(&message, sizeof z);
1128  memcpy(zmq_msg_data(&message), &z, sizeof z);
1129  if (zmq_msg_send(&message, static_cast<zmq_msg_t *>(zmqSelfSend), 0) == -1)
1130  VUserLog("Couldn't break: %s (%d)", zmq_strerror(errno), errno);
1131  zmq_msg_close(&message);
1132 }
1133 
1138 {
1139  if (! zmqControl)
1140  return;
1141 
1142  dispatch_semaphore_wait(controlCanceled, DISPATCH_TIME_FOREVER);
1143  dispatch_sync(controlQueue, ^{
1144  zmq_close(zmqControl);
1145  zmqControl = NULL;
1146  });
1147 
1148  dispatch_release(controlTimer);
1149  controlTimer = NULL;
1150 
1151  if (zmqSelfSend)
1152  {
1153  zmq_close(zmqSelfSend);
1154  zmqSelfSend = NULL;
1155  }
1156 
1157  if (zmqSelfReceive)
1158  {
1159  zmq_close(zmqSelfReceive);
1160  zmqSelfReceive = NULL;
1161  }
1162 }
1163 
1169 {
1170  if (runnerPipe < 0)
1171  return;
1172 
1173  dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
1174  dispatch_async(queue, ^{
1175  char buf[1];
1176  int ret;
1177  int readError;
1178  do {
1179  ret = read(runnerPipe, &buf, 1);
1180  readError = errno;
1181  usleep(USEC_PER_SEC / 100);
1182  } while (ret == -1 && readError == EAGAIN);
1183 
1184  _hasZmqConnection = false;
1186  });
1187 }
1188 
1189 extern "C"
1190 {
1194 void vuoSendNodeExecutionStarted(VuoCompositionState *compositionState, const char *nodeIdentifier)
1195 {
1196  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1197  return runtimeState->persistentState->communicator->sendNodeExecutionStarted(compositionState->compositionIdentifier, nodeIdentifier);
1198 }
1199 
1203 void vuoSendNodeExecutionFinished(VuoCompositionState *compositionState, const char *nodeIdentifier)
1204 {
1205  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1206  return runtimeState->persistentState->communicator->sendNodeExecutionFinished(compositionState->compositionIdentifier, nodeIdentifier);
1207 }
1208 
1212 void vuoSendInputPortsUpdated(VuoCompositionState *compositionState, const char *portIdentifier, bool receivedEvent, bool receivedData, const char *portDataSummary)
1213 {
1214  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1215  return runtimeState->persistentState->communicator->sendInputPortsUpdated(compositionState->compositionIdentifier, portIdentifier, receivedEvent, receivedData, portDataSummary);
1216 }
1217 
1223 void vuoSendOutputPortsUpdated(VuoCompositionState *compositionState, const char *portIdentifier, bool sentEvent, bool sentData, const char *portDataSummary)
1224 {
1225  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1226  return runtimeState->persistentState->communicator->sendOutputPortsUpdated(compositionState->compositionIdentifier, portIdentifier, sentEvent, sentData, portDataSummary);
1227 }
1228 
1232 void vuoSendPublishedOutputPortsUpdated(VuoCompositionState *compositionState, const char *portIdentifier, bool sentData, const char *portDataSummary)
1233 {
1234  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1235  return runtimeState->persistentState->communicator->sendPublishedOutputPortsUpdated(portIdentifier, sentData, portDataSummary);
1236 }
1237 
1243 void vuoSendEventFinished(VuoCompositionState *compositionState, unsigned long eventId)
1244 {
1245  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1247  return runtimeState->persistentState->communicator->sendEventFinished(eventId, compositionContext);
1248 }
1249 
1253 void vuoSendEventDropped(VuoCompositionState *compositionState, const char *portIdentifier)
1254 {
1255  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1256  return runtimeState->persistentState->communicator->sendEventDropped(compositionState->compositionIdentifier, portIdentifier);
1257 }
1258 
1262 void vuoSendError(VuoCompositionState *compositionState, const char *message)
1263 {
1264  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1266 }
1267 
1271 bool vuoShouldSendPortDataTelemetry(VuoCompositionState *compositionState, const char *portIdentifier)
1272 {
1273  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1274  return runtimeState->persistentState->communicator->shouldSendPortDataTelemetry(compositionState->compositionIdentifier, portIdentifier);
1275 }
1276 
1280 char * vuoGetInputPortString(VuoCompositionState *compositionState, const char *portIdentifier, bool shouldUseInterprocessSerialization)
1281 {
1282  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1283  return runtimeState->persistentState->nodeRegistry->getPortValue(compositionState, portIdentifier, shouldUseInterprocessSerialization);
1284 }
1285 
1289 char * vuoGetOutputPortString(VuoCompositionState *compositionState, const char *portIdentifier, bool shouldUseInterprocessSerialization)
1290 {
1291  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1292  return runtimeState->persistentState->nodeRegistry->getPortValue(compositionState, portIdentifier, shouldUseInterprocessSerialization);
1293 }
1294 
1295 } // extern "C"