Vuo  2.0.0
VuoRuntimeCommunicator.cc
Go to the documentation of this file.
1 
11 
12 #include <dlfcn.h>
13 #include <sstream>
14 #include "VuoEventLoop.h"
15 #include "VuoException.hh"
16 #include "VuoHeap.h"
17 #include "VuoNodeRegistry.hh"
19 #include "VuoRuntimeState.hh"
20 
25 {
26  this->persistentState = persistentState;
27 
28  _hasZmqConnection = false;
29 
30  zmqContext = NULL;
31  zmqControl = NULL;
32  zmqSelfReceive = NULL;
33  zmqSelfSend = NULL;
34  zmqTelemetry = NULL;
35 
36  controlQueue = dispatch_queue_create("org.vuo.runtime.control", NULL);
37  telemetryQueue = dispatch_queue_create("org.vuo.runtime.telemetry", NULL);
38  controlTimer = NULL;
39  telemetryTimer = NULL;
40  controlCanceled = dispatch_semaphore_create(0);
41  telemetryCanceled = dispatch_semaphore_create(0);
42 
43  runnerPipe = -1;
44 
45  vuoInstanceInit = NULL;
46  vuoInstanceTriggerStart = NULL;
47  vuoInstanceTriggerStop = NULL;
48  vuoSetInputPortValue = NULL;
49  getPublishedInputPortCount = NULL;
50  getPublishedOutputPortCount = NULL;
51  getPublishedInputPortNames = NULL;
52  getPublishedOutputPortNames = NULL;
53  getPublishedInputPortTypes = NULL;
54  getPublishedOutputPortTypes = NULL;
55  getPublishedInputPortDetails = NULL;
56  getPublishedOutputPortDetails = NULL;
57  firePublishedInputPortEvent = NULL;
58  setPublishedInputPortValue = NULL;
59  getPublishedInputPortValue = NULL;
60  getPublishedOutputPortValue = NULL;
61 }
62 
67 {
68  dispatch_release(controlQueue);
69  dispatch_release(telemetryQueue);
70  dispatch_release(controlCanceled);
71  dispatch_release(telemetryCanceled);
72 }
73 
79 void VuoRuntimeCommunicator::updateCompositionSymbols(void *compositionBinaryHandle)
80 {
81  ostringstream errorMessage;
82 
83  vuoInstanceInit = (vuoInstanceInitType) dlsym(compositionBinaryHandle, "vuoInstanceInit");
84  if (! vuoInstanceInit)
85  {
86  errorMessage << "The composition couldn't be started because its vuoInstanceInit() function couldn't be found : " << dlerror();
87  throw VuoException(errorMessage.str());
88  }
89 
90  vuoInstanceTriggerStart = (vuoInstanceTriggerStartType) dlsym(compositionBinaryHandle, "vuoInstanceTriggerStart");
91  if (! vuoInstanceTriggerStart)
92  {
93  errorMessage << "The composition couldn't be started because its vuoInstanceTriggerStart() function couldn't be found : " << dlerror();
94  throw VuoException(errorMessage.str());
95  }
96 
97  vuoInstanceTriggerStop = (vuoInstanceTriggerStopType) dlsym(compositionBinaryHandle, "vuoInstanceTriggerStop");
98  if (! vuoInstanceTriggerStop)
99  {
100  errorMessage << "The composition couldn't be started because its vuoInstanceTriggerStop() function couldn't be found : " << dlerror();
101  throw VuoException(errorMessage.str());
102  }
103 
104  vuoSetInputPortValue = (vuoSetInputPortValueType) dlsym(compositionBinaryHandle, "vuoSetInputPortValue");
105  if (! vuoSetInputPortValue)
106  {
107  errorMessage << "The composition couldn't be started because its vuoSetInputPortValue() function couldn't be found : " << dlerror();
108  throw VuoException(errorMessage.str());
109  }
110 
111  getPublishedInputPortCount = (getPublishedInputPortCountType) dlsym(compositionBinaryHandle, "getPublishedInputPortCount");
112  if (! getPublishedInputPortCount)
113  {
114  errorMessage << "The composition couldn't be started because its getPublishedInputPortCount() function couldn't be found : " << dlerror();
115  throw VuoException(errorMessage.str());
116  }
117 
118  getPublishedOutputPortCount = (getPublishedOutputPortCountType) dlsym(compositionBinaryHandle, "getPublishedOutputPortCount");
119  if (! getPublishedOutputPortCount)
120  {
121  errorMessage << "The composition couldn't be started because its getPublishedOutputPortCount() function couldn't be found : " << dlerror();
122  throw VuoException(errorMessage.str());
123  }
124 
125  getPublishedInputPortNames = (getPublishedInputPortNamesType) dlsym(compositionBinaryHandle, "getPublishedInputPortNames");
126  if (! getPublishedInputPortNames)
127  {
128  errorMessage << "The composition couldn't be started because its getPublishedInputPortNames() function couldn't be found : " << dlerror();
129  throw VuoException(errorMessage.str());
130  }
131 
132  getPublishedOutputPortNames = (getPublishedOutputPortNamesType) dlsym(compositionBinaryHandle, "getPublishedOutputPortNames");
133  if (! getPublishedOutputPortNames)
134  {
135  errorMessage << "The composition couldn't be started because its getPublishedOutputPortNames() function couldn't be found : " << dlerror();
136  throw VuoException(errorMessage.str());
137  }
138 
139  getPublishedInputPortTypes = (getPublishedInputPortTypesType) dlsym(compositionBinaryHandle, "getPublishedInputPortTypes");
140  if (! getPublishedInputPortTypes)
141  {
142  errorMessage << "The composition couldn't be started because its getPublishedInputPortTypes() function couldn't be found : " << dlerror();
143  throw VuoException(errorMessage.str());
144  }
145 
146  getPublishedOutputPortTypes = (getPublishedOutputPortTypesType) dlsym(compositionBinaryHandle, "getPublishedOutputPortTypes");
147  if (! getPublishedOutputPortTypes)
148  {
149  errorMessage << "The composition couldn't be started because its getPublishedOutputPortTypes() function couldn't be found : " << dlerror();
150  throw VuoException(errorMessage.str());
151  }
152 
153  getPublishedInputPortDetails = (getPublishedInputPortDetailsType) dlsym(compositionBinaryHandle, "getPublishedInputPortDetails");
154  if (! getPublishedInputPortDetails)
155  {
156  errorMessage << "The composition couldn't be started because its getPublishedInputPortDetails() function couldn't be found : " << dlerror();
157  throw VuoException(errorMessage.str());
158  }
159 
160  getPublishedOutputPortDetails = (getPublishedOutputPortDetailsType) dlsym(compositionBinaryHandle, "getPublishedOutputPortDetails");
161  if (! getPublishedOutputPortDetails)
162  {
163  errorMessage << "The composition couldn't be started because its getPublishedOutputPortDetails() function couldn't be found : " << dlerror();
164  throw VuoException(errorMessage.str());
165  }
166 
167  firePublishedInputPortEvent = (firePublishedInputPortEventType) dlsym(compositionBinaryHandle, "firePublishedInputPortEvent");
168  if (! firePublishedInputPortEvent)
169  {
170  errorMessage << "The composition couldn't be started because its firePublishedInputPortEvent() function couldn't be found : " << dlerror();
171  throw VuoException(errorMessage.str());
172  }
173 
174  setPublishedInputPortValue = (setPublishedInputPortValueType) dlsym(compositionBinaryHandle, "setPublishedInputPortValue");
175  if (! setPublishedInputPortValue)
176  {
177  errorMessage << "The composition couldn't be started because its setPublishedInputPortValue() function couldn't be found : " << dlerror();
178  throw VuoException(errorMessage.str());
179  }
180 
181  getPublishedInputPortValue = (getPublishedInputPortValueType) dlsym(compositionBinaryHandle, "getPublishedInputPortValue");
182  if (! getPublishedInputPortValue)
183  {
184  errorMessage << "The composition couldn't be started because its getPublishedInputPortValue() function couldn't be found : " << dlerror();
185  throw VuoException(errorMessage.str());
186  }
187 
188  getPublishedOutputPortValue = (getPublishedOutputPortValueType) dlsym(compositionBinaryHandle, "getPublishedOutputPortValue");
189  if (! getPublishedOutputPortValue)
190  {
191  errorMessage << "The composition couldn't be started because its getPublishedOutputPortValue() function couldn't be found : " << dlerror();
192  throw VuoException(errorMessage.str());
193  }
194 }
195 
201 void VuoRuntimeCommunicator::openConnection(void *_zmqContext, const char *controlURL, const char *telemetryURL, int runnerPipe)
202 {
203  this->runnerPipe = runnerPipe;
204 
205  if (controlURL && telemetryURL)
206  {
207  _hasZmqConnection = true;
208  zmqContext = (_zmqContext ? _zmqContext : zmq_init(1));
209 
210  zmqControl = zmq_socket(zmqContext,ZMQ_REP);
211  zmqTelemetry = zmq_socket(zmqContext,ZMQ_PUB);
212 
213  bool error = false;
214  ostringstream errorMessage;
215 
216  if(zmq_bind(zmqControl,controlURL))
217  {
218  errorMessage << "The composition couldn't start because it couldn't establish communication to be controlled by the runner : " << zmq_strerror(errno) << endl;
219  error = true;
220  }
221 
222  zmqSelfReceive = zmq_socket(zmqContext, ZMQ_PAIR);
223  if (zmq_bind(zmqSelfReceive, "inproc://vuo-runtime-self") != 0)
224  {
225  errorMessage << "Couldn't bind self-receive socket: " << zmq_strerror(errno) << " " << errno << endl;
226  error = true;
227  }
228 
229  zmqSelfSend = zmq_socket(zmqContext, ZMQ_PAIR);
230  if (zmq_connect(zmqSelfSend, "inproc://vuo-runtime-self") != 0)
231  {
232  errorMessage << "Couldn't connect self-send socket: " << zmq_strerror(errno) << " " << errno << endl;
233  error = true;
234  }
235 
236  if(zmq_bind(zmqTelemetry,telemetryURL))
237  {
238  errorMessage << "The composition couldn't start because it couldn't establish communication to be listened to by the runner : " << zmq_strerror(errno) << endl;
239  error = true;
240  }
241 
242  if (error)
243  {
244  zmq_close(zmqControl);
245  zmqControl = NULL;
246  zmq_close(zmqSelfSend);
247  zmqSelfSend = NULL;
248  zmq_close(zmqSelfReceive);
249  zmqSelfReceive = NULL;
250  zmq_close(zmqTelemetry);
251  zmqTelemetry = NULL;
252  throw VuoException(errorMessage.str());
253  }
254  }
255 }
256 
261 {
262  if (! _hasZmqConnection)
263  return;
264 
265  zmq_term(zmqContext);
266  zmqContext = NULL;
267 
268  _hasZmqConnection = false;
269 }
270 
275 {
276  return _hasZmqConnection;
277 }
278 
283 {
284  return controlQueue;
285 }
286 
290 void VuoRuntimeCommunicator::sendControlReply(enum VuoControlReply reply, zmq_msg_t *messages, unsigned int messageCount)
291 {
292  vuoSend("VuoControl",zmqControl,reply,messages,messageCount,false,NULL);
293 }
294 
298 void VuoRuntimeCommunicator::sendTelemetry(enum VuoTelemetry type, zmq_msg_t *messages, unsigned int messageCount)
299 {
301  if (!zmqTelemetry)
302  return;
303 
304  dispatch_sync(telemetryQueue, ^{
306  vuoSend("VuoTelemetry",zmqTelemetry,type,messages,messageCount,true,NULL);
307  });
308 }
309 
315 void VuoRuntimeCommunicator::sendNodeExecutionStarted(const char *compositionIdentifier, const char *nodeIdentifier)
316 {
317  if (! (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToEventTelemetry(compositionIdentifier)))
318  return;
319 
320  zmq_msg_t messages[2];
322  vuoInitMessageWithString(&messages[1], nodeIdentifier);
323 
324  sendTelemetry(VuoTelemetryNodeExecutionStarted, messages, 2);
325 }
326 
333 {
334  if (! (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToEventTelemetry(compositionIdentifier)))
335  return;
336 
337  zmq_msg_t messages[2];
339  vuoInitMessageWithString(&messages[1], nodeIdentifier);
340 
341  sendTelemetry(VuoTelemetryNodeExecutionFinished, messages, 2);
342 }
343 
349 void VuoRuntimeCommunicator::sendInputPortsUpdated(const char *compositionIdentifier, const char *portIdentifier, bool receivedEvent, bool receivedData, const char *portDataSummary)
350 {
351  bool isSendingAllTelemetry = isSubscribedToAllTelemetry(compositionIdentifier);
352  bool isSendingPortTelemetry = isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier);
353  if (! (isSendingAllTelemetry || isSubscribedToEventTelemetry(compositionIdentifier) || isSendingPortTelemetry))
354  return;
355 
356  zmq_msg_t messages[5];
358  vuoInitMessageWithString(&messages[1], portIdentifier);
359  vuoInitMessageWithBool(&messages[2], receivedEvent);
360  vuoInitMessageWithBool(&messages[3], receivedData);
361  vuoInitMessageWithString(&messages[4], (portDataSummary && (isSendingAllTelemetry || isSendingPortTelemetry)) ? portDataSummary : "");
362 
363  sendTelemetry(VuoTelemetryInputPortsUpdated, messages, 5);
364 }
365 
371 void VuoRuntimeCommunicator::sendOutputPortsUpdated(const char *compositionIdentifier, const char *portIdentifier, bool sentEvent, bool sentData, const char *portDataSummary)
372 {
373  bool isSendingAllTelemetry = isSubscribedToAllTelemetry(compositionIdentifier);
374  bool isSendingPortTelemetry = isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier);
375  if (! (isSendingAllTelemetry || isSubscribedToEventTelemetry(compositionIdentifier) || isSendingPortTelemetry))
376  return;
377 
378  zmq_msg_t messages[5];
380  vuoInitMessageWithString(&messages[1], portIdentifier);
381  vuoInitMessageWithBool(&messages[2], sentEvent);
382  vuoInitMessageWithBool(&messages[3], sentData);
383  vuoInitMessageWithString(&messages[4], (portDataSummary && (isSendingAllTelemetry || isSendingPortTelemetry)) ? portDataSummary : "");
384 
385  sendTelemetry(VuoTelemetryOutputPortsUpdated, messages, 5);
386 }
387 
391 void VuoRuntimeCommunicator::sendPublishedOutputPortsUpdated(const char *portIdentifier, bool sentData, const char *portDataSummary)
392 {
393  zmq_msg_t messages[3];
394  vuoInitMessageWithString(&messages[0], portIdentifier);
395  vuoInitMessageWithBool(&messages[1], sentData);
396  vuoInitMessageWithString(&messages[2], (portDataSummary ? portDataSummary : ""));
397 
398  sendTelemetry(VuoTelemetryPublishedOutputPortsUpdated, messages, 3);
399 }
400 
406 void VuoRuntimeCommunicator::sendEventFinished(unsigned long eventId, NodeContext *compositionContext)
407 {
408  if (! vuoFinishedExecutingEvent(compositionContext, eventId))
409  return;
410 
411  sendTelemetry(VuoTelemetryEventFinished, NULL, 0);
412 }
413 
419 void VuoRuntimeCommunicator::sendEventDropped(const char *compositionIdentifier, const char *portIdentifier)
420 {
421  bool isSendingPortTelemetry = isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier);
422  if (! (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToEventTelemetry(compositionIdentifier) || isSendingPortTelemetry))
423  return;
424 
425  zmq_msg_t messages[2];
427  vuoInitMessageWithString(&messages[1], portIdentifier);
428 
429  sendTelemetry(VuoTelemetryEventDropped, messages, 2);
430 }
431 
435 void VuoRuntimeCommunicator::sendError(const char *message)
436 {
437  if (! _hasZmqConnection)
438  return;
439 
440  zmq_msg_t messages[1];
441  vuoInitMessageWithString(&messages[0], message);
442 
443  sendTelemetry(VuoTelemetryError, messages, 1);
444 }
445 
450 {
451  sendTelemetry(VuoTelemetryStopRequested, NULL, 0);
452 }
453 
459 {
460  sendControlReply(VuoControlReplyCompositionStopping, NULL, 0);
461  zmq_close(zmqControl); // wait until message fully sends
462  zmqControl = NULL;
463 }
464 
468 void VuoRuntimeCommunicator::subscribeToPortDataTelemetry(const char *compositionIdentifier, const char *portIdentifer)
469 {
470  portsSendingDataTelemetry[compositionIdentifier].insert(portIdentifer);
471 }
472 
476 void VuoRuntimeCommunicator::unsubscribeFromPortDataTelemetry(const char *compositionIdentifier, const char *portIdentifer)
477 {
478  map<string, set<string> >::iterator iter1 = portsSendingDataTelemetry.find(compositionIdentifier);
479  if (iter1 != portsSendingDataTelemetry.end())
480  {
481  set<string>::iterator iter2 = iter1->second.find(portIdentifer);
482  if (iter2 != iter1->second.end())
483  {
484  iter1->second.erase(iter2);
485  if (iter1->second.empty())
486  portsSendingDataTelemetry.erase(iter1);
487  }
488  }
489 }
490 
494 bool VuoRuntimeCommunicator::isSubscribedToPortDataTelemetry(const char *compositionIdentifier, const char *portIdentifer)
495 {
496  map<string, set<string> >::iterator iter = portsSendingDataTelemetry.find(compositionIdentifier);
497  if (iter != portsSendingDataTelemetry.end())
498  return (iter->second.find(portIdentifer) != iter->second.end());
499 
500  return false;
501 }
502 
506 void VuoRuntimeCommunicator::subscribeToEventTelemetry(const char *compositionIdentifier)
507 {
508  compositionsSendingEventTelemetry.insert(compositionIdentifier);
509 }
510 
514 void VuoRuntimeCommunicator::unsubscribeFromEventTelemetry(const char *compositionIdentifier)
515 {
516  set<string>::iterator iter = compositionsSendingEventTelemetry.find(compositionIdentifier);
517  if (iter != compositionsSendingEventTelemetry.end())
518  compositionsSendingEventTelemetry.erase(iter);
519 }
520 
524 bool VuoRuntimeCommunicator::isSubscribedToEventTelemetry(const char *compositionIdentifier)
525 {
526  set<string>::iterator iter = compositionsSendingEventTelemetry.find(compositionIdentifier);
527  return (iter != compositionsSendingEventTelemetry.end());
528 }
529 
533 void VuoRuntimeCommunicator::subscribeToAllTelemetry(const char *compositionIdentifier)
534 {
535  compositionsSendingAllTelemetry.insert(compositionIdentifier);
536 }
537 
541 void VuoRuntimeCommunicator::unsubscribeFromAllTelemetry(const char *compositionIdentifier)
542 {
543  set<string>::iterator iter = compositionsSendingAllTelemetry.find(compositionIdentifier);
544  if (iter != compositionsSendingAllTelemetry.end())
545  compositionsSendingAllTelemetry.erase(iter);
546 }
547 
551 bool VuoRuntimeCommunicator::isSubscribedToAllTelemetry(const char *compositionIdentifier)
552 {
553  set<string>::iterator iter = compositionsSendingAllTelemetry.find(compositionIdentifier);
554  return (iter != compositionsSendingAllTelemetry.end());
555 }
556 
563 {
564  return (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier));
565 }
566 
572 char * VuoRuntimeCommunicator::mergeEnumDetails(string type, const char *details)
573 {
574  string allowedValuesFunctionName = type + "_getAllowedValues";
575  typedef void *(*allowedValuesFunctionType)(void);
576  allowedValuesFunctionType allowedValuesFunction = (allowedValuesFunctionType)dlsym(RTLD_SELF, allowedValuesFunctionName.c_str());
577  if (!allowedValuesFunction)
578  return NULL;
579 
580  string getJsonFunctionName = type + "_getJson";
581  typedef json_object *(*getJsonFunctionType)(int64_t);
582  getJsonFunctionType getJsonFunction = (getJsonFunctionType)dlsym(RTLD_SELF, getJsonFunctionName.c_str());
583  if (!getJsonFunction)
584  return NULL;
585 
586  string summaryFunctionName = type + "_getSummary";
587  typedef char *(*summaryFunctionType)(int64_t);
588  summaryFunctionType summaryFunction = (summaryFunctionType)dlsym(RTLD_SELF, summaryFunctionName.c_str());
589  if (!summaryFunction)
590  return NULL;
591 
592  string listCountFunctionName = "VuoListGetCount_" + type;
593  typedef unsigned long (*listCountFunctionType)(void *);
594  listCountFunctionType listCountFunction = (listCountFunctionType)dlsym(RTLD_SELF, listCountFunctionName.c_str());
595  if (!listCountFunction)
596  return NULL;
597 
598  string listValueFunctionName = "VuoListGetValue_" + type;
599  typedef int64_t (*listValueFunctionType)(void *, unsigned long);
600  listValueFunctionType listValueFunction = (listValueFunctionType)dlsym(RTLD_SELF, listValueFunctionName.c_str());
601  if (!listValueFunction)
602  return NULL;
603 
604  json_object *detailsJson = json_tokener_parse(details);
605  if (!detailsJson)
606  return NULL;
607 
608  void *allowedValues = allowedValuesFunction();
609  VuoRetain(allowedValues);
610  unsigned long listCount = listCountFunction(allowedValues);
611  json_object *menuItems = json_object_new_array();
612  for (unsigned long i = 1; i <= listCount; ++i)
613  {
614  int64_t value = listValueFunction(allowedValues, i);
615  json_object *js = getJsonFunction(value);
616  if (!json_object_is_type(js, json_type_string))
617  continue;
618  const char *key = json_object_get_string(js);
619  char *summary = summaryFunction(value);
620 
621  json_object *menuItem = json_object_new_object();
622  json_object_object_add(menuItem, "value", json_object_new_string(key));
623  json_object_object_add(menuItem, "name", json_object_new_string(summary));
624  json_object_array_add(menuItems, menuItem);
625 
626  free(summary);
627  }
628  VuoRelease(allowedValues);
629 
630  if (json_object_array_length(menuItems))
631  json_object_object_add(detailsJson, "menuItems", menuItems);
632 
633  char *newDetails = strdup(json_object_to_json_string(detailsJson));
634  json_object_put(detailsJson);
635 
636  return newDetails;
637 }
638 
639 void VuoRuntimeCommunicator::sendHeartbeat(bool blocking)
640 {
641  struct rusage r;
642  if(getrusage(RUSAGE_SELF,&r))
643  {
644  VUserLog("The composition couldn't get the information to send for VuoTelemetryStats : %s", strerror(errno));
645  return;
646  }
647 
648  zmq_msg_t messages[2];
649 
650  {
651  uint64_t utime = r.ru_utime.tv_sec*USEC_PER_SEC+r.ru_utime.tv_usec;
652  zmq_msg_init_size(&messages[0], sizeof utime);
653  memcpy(zmq_msg_data(&messages[0]), &utime, sizeof utime);
654  }
655 
656  {
657  uint64_t stime = r.ru_stime.tv_sec*USEC_PER_SEC+r.ru_stime.tv_usec;
658  zmq_msg_init_size(&messages[1], sizeof stime);
659  memcpy(zmq_msg_data(&messages[1]), &stime, sizeof stime);
660  }
661 
662  if (blocking)
663  // When called with blocking=true, we're already on telemetryQueue.
664  vuoSend("VuoTelemetry", zmqTelemetry, VuoTelemetryStats, messages, 2, false, NULL);
665  else
666  sendTelemetry(VuoTelemetryStats, messages, 2);
667 }
668 
673 {
674  if (! zmqTelemetry)
675  return;
676 
677  dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0);
678  telemetryTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, queue);
679  dispatch_source_set_timer(telemetryTimer, dispatch_walltime(NULL, 0), NSEC_PER_SEC/1000, NSEC_PER_SEC/1000);
680 
681  dispatch_source_set_event_handler(telemetryTimer, ^{
682  sendHeartbeat();
683  });
684 
685  dispatch_source_set_cancel_handler(telemetryTimer, ^{
686  dispatch_semaphore_signal(telemetryCanceled);
687  });
688 
689  dispatch_resume(telemetryTimer);
690 }
691 
696 {
697  if (! zmqTelemetry)
698  return;
699 
700  dispatch_source_cancel(telemetryTimer);
701  dispatch_semaphore_wait(telemetryCanceled, DISPATCH_TIME_FOREVER);
702  dispatch_sync(telemetryQueue, ^{
703  // zmq_close calls POSIX close(), whose documentation says "queued data are discarded".
704  // Since telemetry uses non-blocking sends, issue one final _blocking_ send
705  // to ensure that the queue is drained before we close.
706  sendHeartbeat(true);
707 
708  zmq_close(zmqTelemetry);
709  zmqTelemetry = NULL;
710  });
711 
712  dispatch_release(telemetryTimer);
713  telemetryTimer = NULL;
714 }
715 
720 {
721  if (! zmqControl)
722  return;
723 
724  controlTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, controlQueue);
725  dispatch_source_set_timer(controlTimer, dispatch_walltime(NULL,0), NSEC_PER_SEC/1000, NSEC_PER_SEC/1000);
726 
727  dispatch_source_set_event_handler(controlTimer, ^{
728 
730 
731  zmq_pollitem_t items[]=
732  {
733  {zmqControl,0,ZMQ_POLLIN,0},
734  {zmqSelfReceive,0,ZMQ_POLLIN,0},
735  };
736  int itemCount = 2;
737  long timeout = -1; // Wait forever (VuoStopComposition will send a message ZMQSelfReceive when it's time to stop).
738  zmq_poll(items,itemCount,timeout);
739  if(!(items[0].revents & ZMQ_POLLIN))
740  return;
741 
742  enum VuoControlRequest control = (enum VuoControlRequest) vuoReceiveInt(zmqControl, NULL);
743  VuoCompositionState compositionState = { (void *)persistentState->runtimeState, "" };
744 
745  switch (control)
746  {
748  {
749  dispatch_source_set_timer(telemetryTimer, dispatch_walltime(NULL, 0), NSEC_PER_SEC/2, NSEC_PER_SEC/100);
750 
751  sendControlReply(VuoControlReplyHeartbeatSlowed,NULL,0);
752  break;
753  }
755  {
756  int timeoutInSeconds = vuoReceiveInt(zmqControl, NULL);
757  bool isBeingReplaced = vuoReceiveBool(zmqControl, NULL);
758  bool isLastEverInProcess = vuoReceiveBool(zmqControl, NULL);
759 
760  persistentState->runtimeState->stopCompositionAsOrderedByRunner(isBeingReplaced, timeoutInSeconds, isLastEverInProcess);
761 
762  sendControlReply(VuoControlReplyCompositionStopping,NULL,0);
763  break;
764  }
766  {
767  persistentState->runtimeState->pauseComposition();
768 
769  sendControlReply(VuoControlReplyCompositionPaused,NULL,0);
770  break;
771  }
773  {
774  persistentState->runtimeState->unpauseComposition();
775 
776  sendControlReply(VuoControlReplyCompositionUnpaused,NULL,0);
777  break;
778  }
781  {
782  bool shouldUseInterprocessSerialization = vuoReceiveBool(zmqControl, NULL);
783  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
784  char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
785 
786  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
787  char *valueAsString = persistentState->nodeRegistry->getPortValue(&compositionState, portIdentifier, shouldUseInterprocessSerialization);
788 
789  zmq_msg_t messages[1];
790  vuoInitMessageWithString(&messages[0], valueAsString);
791  sendControlReply(control == VuoControlRequestInputPortValueRetrieve ?
793  messages,1);
794 
795  free(compositionIdentifier);
796  free(portIdentifier);
797  free(valueAsString);
798  break;
799  }
802  {
803  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
804  char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
805 
806  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
807  char *summary = persistentState->nodeRegistry->getPortSummary(&compositionState, portIdentifier);
808 
809  zmq_msg_t messages[1];
810  vuoInitMessageWithString(&messages[0], summary);
811  sendControlReply(control == VuoControlRequestInputPortSummaryRetrieve ?
813  messages,1);
814 
815  free(compositionIdentifier);
816  free(portIdentifier);
817  free(summary);
818  break;
819  }
821  {
822  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
823  char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
824 
825  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
826  persistentState->nodeRegistry->fireTriggerPortEvent(&compositionState, portIdentifier);
827 
828  sendControlReply(VuoControlReplyTriggerPortFiredEvent,NULL,0);
829 
830  free(compositionIdentifier);
831  free(portIdentifier);
832  break;
833  }
835  {
836  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
837  char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
838  char *valueAsString = vuoReceiveAndCopyString(zmqControl, NULL);
839 
840  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
841  persistentState->nodeRegistry->setPortValue(&compositionState, portIdentifier, valueAsString);
842 
843  sendControlReply(VuoControlReplyInputPortValueModified,NULL,0);
844 
845  free(compositionIdentifier);
846  free(portIdentifier);
847  free(valueAsString);
848  break;
849  }
851  {
852  int count = getPublishedInputPortCount();
853  char **names = getPublishedInputPortNames();
854 
855  zmq_msg_t messages[count];
856  for (int i = 0; i < count; ++i)
857  vuoInitMessageWithString(&messages[i], names[i]);
858 
859  sendControlReply(VuoControlReplyPublishedInputPortNamesRetrieved,messages,count);
860  break;
861  }
863  {
864  int count = getPublishedOutputPortCount();
865  char **names = getPublishedOutputPortNames();
866 
867  zmq_msg_t messages[count];
868  for (int i = 0; i < count; ++i)
869  vuoInitMessageWithString(&messages[i], names[i]);
870 
871  sendControlReply(VuoControlReplyPublishedOutputPortNamesRetrieved,messages,count);
872  break;
873  }
875  {
876  int count = getPublishedInputPortCount();
877  char **names = getPublishedInputPortTypes();
878 
879  zmq_msg_t messages[count];
880  for (int i = 0; i < count; ++i)
881  vuoInitMessageWithString(&messages[i], names[i]);
882 
883  sendControlReply(VuoControlReplyPublishedInputPortTypesRetrieved,messages,count);
884  break;
885  }
887  {
888  int count = getPublishedOutputPortCount();
889  char **names = getPublishedOutputPortTypes();
890 
891  zmq_msg_t messages[count];
892  for (int i = 0; i < count; ++i)
893  vuoInitMessageWithString(&messages[i], names[i]);
894 
895  sendControlReply(VuoControlReplyPublishedOutputPortTypesRetrieved,messages,count);
896  break;
897  }
899  {
900  int count = getPublishedInputPortCount();
901  char **types = getPublishedInputPortTypes();
902  char **names = getPublishedInputPortDetails();
903 
904  zmq_msg_t messages[count];
905  for (int i = 0; i < count; ++i)
906  {
907  char *newDetails = mergeEnumDetails(types[i], names[i]);
908  if (newDetails)
909  names[i] = newDetails;
910  vuoInitMessageWithString(&messages[i], names[i]);
911  if (newDetails)
912  free(newDetails);
913  }
914 
915  sendControlReply(VuoControlReplyPublishedInputPortDetailsRetrieved,messages,count);
916  break;
917  }
919  {
920  int count = getPublishedOutputPortCount();
921  char **names = getPublishedOutputPortDetails();
922 
923  zmq_msg_t messages[count];
924  for (int i = 0; i < count; ++i)
925  vuoInitMessageWithString(&messages[i], names[i]);
926 
927  sendControlReply(VuoControlReplyPublishedOutputPortDetailsRetrieved,messages,count);
928  break;
929  }
931  {
932  int count = vuoReceiveInt(zmqControl, NULL);
933 
934  char **names = (char **)malloc(count * sizeof(char *));
935  for (int i = 0; i < count; ++i)
936  names[i] = vuoReceiveAndCopyString(zmqControl, NULL);
937 
938  firePublishedInputPortEvent(names, count);
939 
940  for (int i = 0; i < count; ++i)
941  free(names[i]);
942  free(names);
943 
944  sendControlReply(VuoControlReplyPublishedInputPortFiredEvent,NULL,0);
945  break;
946  }
948  {
950 
951  while (VuoTelemetry_hasMoreToReceive(zmqControl))
952  {
953  char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
954  char *valueAsString = vuoReceiveAndCopyString(zmqControl, NULL);
955 
956  setPublishedInputPortValue(portIdentifier, valueAsString);
957 
958  free(portIdentifier);
959  free(valueAsString);
960  }
961 
962  sendControlReply(VuoControlReplyPublishedInputPortValueModified,NULL,0);
963 
965  break;
966  }
968  {
970 
971  bool shouldUseInterprocessSerialization = vuoReceiveBool(zmqControl, NULL);
972  char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
973  char *valueAsString = getPublishedInputPortValue(portIdentifier, shouldUseInterprocessSerialization);
974  free(portIdentifier);
975  zmq_msg_t messages[1];
976  vuoInitMessageWithString(&messages[0], valueAsString);
977  free(valueAsString);
978  sendControlReply(VuoControlReplyPublishedInputPortValueRetrieved,messages,1);
979 
981  break;
982  }
984  {
986 
987  bool shouldUseInterprocessSerialization = vuoReceiveBool(zmqControl, NULL);
988  char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
989  char *valueAsString = getPublishedOutputPortValue(portIdentifier, shouldUseInterprocessSerialization);
990  free(portIdentifier);
991  zmq_msg_t messages[1];
992  vuoInitMessageWithString(&messages[0], valueAsString);
993  free(valueAsString);
994  sendControlReply(VuoControlReplyPublishedOutputPortValueRetrieved,messages,1);
995 
997  break;
998  }
1001  {
1002  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1003  char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1004 
1005  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
1006  subscribeToPortDataTelemetry(compositionState.compositionIdentifier, portIdentifier);
1007 
1008  char *summary = persistentState->nodeRegistry->getPortSummary(&compositionState, portIdentifier);
1009 
1010  zmq_msg_t messages[1];
1011  vuoInitMessageWithString(&messages[0], summary);
1012  sendControlReply(control == VuoControlRequestInputPortTelemetrySubscribe ?
1014  messages,1);
1015 
1016  free(compositionIdentifier);
1017  free(portIdentifier);
1018  free(summary);
1019  break;
1020  }
1023  {
1024  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1025  char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1026 
1027  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
1028  unsubscribeFromPortDataTelemetry(compositionState.compositionIdentifier, portIdentifier);
1029 
1030  sendControlReply(control == VuoControlRequestInputPortTelemetryUnsubscribe ?
1032  NULL,0);
1033 
1034  free(compositionIdentifier);
1035  free(portIdentifier);
1036  break;
1037  }
1039  {
1040  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1041 
1042  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
1043  subscribeToEventTelemetry(compositionState.compositionIdentifier);
1044 
1045  sendControlReply(VuoControlReplyEventTelemetrySubscribed,NULL,0);
1046 
1047  free(compositionIdentifier);
1048  break;
1049  }
1051  {
1052  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1053 
1054  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
1055  unsubscribeFromEventTelemetry(compositionState.compositionIdentifier);
1056 
1057  sendControlReply(VuoControlReplyEventTelemetryUnsubscribed,NULL,0);
1058 
1059  free(compositionIdentifier);
1060  break;
1061  }
1063  {
1064  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1065 
1066  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
1067  subscribeToAllTelemetry(compositionState.compositionIdentifier);
1068 
1069  sendControlReply(VuoControlReplyAllTelemetrySubscribed,NULL,0);
1070 
1071  free(compositionIdentifier);
1072  break;
1073  }
1075  {
1076  char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1077 
1078  compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
1079  unsubscribeFromAllTelemetry(compositionState.compositionIdentifier);
1080 
1081  sendControlReply(VuoControlReplyAllTelemetryUnsubscribed,NULL,0);
1082 
1083  free(compositionIdentifier);
1084  break;
1085  }
1086  }
1087  });
1088 
1089  dispatch_source_set_cancel_handler(controlTimer, ^{
1090 
1091  persistentState->runtimeState->breakOutOfEventLoop();
1092  dispatch_semaphore_signal(controlCanceled);
1093 
1094  });
1095 
1096  dispatch_resume(controlTimer);
1097 }
1098 
1103 {
1104  if (! zmqControl)
1105  return;
1106 
1107  dispatch_source_cancel(controlTimer);
1108 }
1109 
1114 {
1115  if (! zmqSelfSend)
1116  return;
1117 
1118  char z = 0;
1119  zmq_msg_t message;
1120  zmq_msg_init_size(&message, sizeof z);
1121  memcpy(zmq_msg_data(&message), &z, sizeof z);
1122  if (zmq_send(zmqSelfSend, &message, 0) != 0)
1123  VUserLog("Couldn't break: %s (%d)", zmq_strerror(errno), errno);
1124  zmq_msg_close(&message);
1125 }
1126 
1131 {
1132  if (! zmqControl)
1133  return;
1134 
1135  dispatch_semaphore_wait(controlCanceled, DISPATCH_TIME_FOREVER);
1136  dispatch_sync(controlQueue, ^{
1137  zmq_close(zmqControl);
1138  zmqControl = NULL;
1139  });
1140 
1141  dispatch_release(controlTimer);
1142  controlTimer = NULL;
1143 
1144  if (zmqSelfSend)
1145  {
1146  zmq_close(zmqSelfSend);
1147  zmqSelfSend = NULL;
1148  }
1149 
1150  if (zmqSelfReceive)
1151  {
1152  zmq_close(zmqSelfReceive);
1153  zmqSelfReceive = NULL;
1154  }
1155 }
1156 
1162 {
1163  if (runnerPipe < 0)
1164  return;
1165 
1166  dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
1167  dispatch_async(queue, ^{
1168  char buf[1];
1169  int ret;
1170  int readError;
1171  do {
1172  ret = read(runnerPipe, &buf, 1);
1173  readError = errno;
1174  usleep(USEC_PER_SEC / 100);
1175  } while (ret == -1 && readError == EAGAIN);
1176 
1177  _hasZmqConnection = false;
1179  });
1180 }
1181 
1182 extern "C"
1183 {
1187 void vuoSendNodeExecutionStarted(VuoCompositionState *compositionState, const char *nodeIdentifier)
1188 {
1189  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1190  return runtimeState->persistentState->communicator->sendNodeExecutionStarted(compositionState->compositionIdentifier, nodeIdentifier);
1191 }
1192 
1196 void vuoSendNodeExecutionFinished(VuoCompositionState *compositionState, const char *nodeIdentifier)
1197 {
1198  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1199  return runtimeState->persistentState->communicator->sendNodeExecutionFinished(compositionState->compositionIdentifier, nodeIdentifier);
1200 }
1201 
1205 void vuoSendInputPortsUpdated(VuoCompositionState *compositionState, const char *portIdentifier, bool receivedEvent, bool receivedData, const char *portDataSummary)
1206 {
1207  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1208  return runtimeState->persistentState->communicator->sendInputPortsUpdated(compositionState->compositionIdentifier, portIdentifier, receivedEvent, receivedData, portDataSummary);
1209 }
1210 
1216 void vuoSendOutputPortsUpdated(VuoCompositionState *compositionState, const char *portIdentifier, bool sentEvent, bool sentData, const char *portDataSummary)
1217 {
1218  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1219  return runtimeState->persistentState->communicator->sendOutputPortsUpdated(compositionState->compositionIdentifier, portIdentifier, sentEvent, sentData, portDataSummary);
1220 }
1221 
1225 void vuoSendPublishedOutputPortsUpdated(VuoCompositionState *compositionState, const char *portIdentifier, bool sentData, const char *portDataSummary)
1226 {
1227  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1228  return runtimeState->persistentState->communicator->sendPublishedOutputPortsUpdated(portIdentifier, sentData, portDataSummary);
1229 }
1230 
1236 void vuoSendEventFinished(VuoCompositionState *compositionState, unsigned long eventId)
1237 {
1238  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1240  return runtimeState->persistentState->communicator->sendEventFinished(eventId, compositionContext);
1241 }
1242 
1246 void vuoSendEventDropped(VuoCompositionState *compositionState, const char *portIdentifier)
1247 {
1248  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1249  return runtimeState->persistentState->communicator->sendEventDropped(compositionState->compositionIdentifier, portIdentifier);
1250 }
1251 
1255 void vuoSendError(VuoCompositionState *compositionState, const char *message)
1256 {
1257  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1259 }
1260 
1264 bool vuoShouldSendPortDataTelemetry(VuoCompositionState *compositionState, const char *portIdentifier)
1265 {
1266  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1267  return runtimeState->persistentState->communicator->shouldSendPortDataTelemetry(compositionState->compositionIdentifier, portIdentifier);
1268 }
1269 
1273 char * vuoGetInputPortString(VuoCompositionState *compositionState, const char *portIdentifier, bool shouldUseInterprocessSerialization)
1274 {
1275  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1276  return runtimeState->persistentState->nodeRegistry->getPortValue(compositionState, portIdentifier, shouldUseInterprocessSerialization);
1277 }
1278 
1282 char * vuoGetOutputPortString(VuoCompositionState *compositionState, const char *portIdentifier, bool shouldUseInterprocessSerialization)
1283 {
1284  VuoRuntimeState *runtimeState = (VuoRuntimeState *)compositionState->runtimeState;
1285  return runtimeState->persistentState->nodeRegistry->getPortValue(compositionState, portIdentifier, shouldUseInterprocessSerialization);
1286 }
1287 
1288 } // extern "C"