Vuo 2.4.4
Loading...
Searching...
No Matches
VuoRuntimeCommunicator.cc
Go to the documentation of this file.
1
11
12#include <dlfcn.h>
13#include <sstream>
14#include "VuoEventLoop.h"
15#include "VuoException.hh"
16#include "VuoHeap.h"
17#include "VuoNodeRegistry.hh"
19#include "VuoRuntimeState.hh"
20
25{
26 this->persistentState = persistentState;
27
28 _hasZmqConnection = false;
29
30 zmqContext = NULL;
31 zmqControl = NULL;
32 zmqSelfReceive = NULL;
33 zmqSelfSend = NULL;
34 zmqTelemetry = NULL;
35
36 controlQueue = dispatch_queue_create("org.vuo.runtime.control", NULL);
37 telemetryQueue = dispatch_queue_create("org.vuo.runtime.telemetry", NULL);
38 controlTimer = NULL;
39 telemetryTimer = NULL;
40 controlCanceled = dispatch_semaphore_create(0);
41 telemetryCanceled = dispatch_semaphore_create(0);
42
43 runnerPipe = -1;
44
45 vuoInstanceInit = NULL;
46 vuoInstanceTriggerStart = NULL;
47 vuoInstanceTriggerStop = NULL;
48 vuoSetInputPortValue = NULL;
49 getPublishedInputPortCount = NULL;
50 getPublishedOutputPortCount = NULL;
51 getPublishedInputPortNames = NULL;
52 getPublishedOutputPortNames = NULL;
53 getPublishedInputPortTypes = NULL;
54 getPublishedOutputPortTypes = NULL;
55 getPublishedInputPortDetails = NULL;
56 getPublishedOutputPortDetails = NULL;
57 firePublishedInputPortEvent = NULL;
58 setPublishedInputPortValue = NULL;
59 getPublishedInputPortValue = NULL;
60 getPublishedOutputPortValue = NULL;
61}
62
67{
68 dispatch_release(controlQueue);
69 dispatch_release(telemetryQueue);
70 dispatch_release(controlCanceled);
71 dispatch_release(telemetryCanceled);
72}
73
79void VuoRuntimeCommunicator::updateCompositionSymbols(void *compositionBinaryHandle)
80{
81 ostringstream errorMessage;
82
83 vuoInstanceInit = (vuoInstanceInitType) dlsym(compositionBinaryHandle, "vuoInstanceInit");
84 if (! vuoInstanceInit)
85 {
86 errorMessage << "The composition couldn't be started because its vuoInstanceInit() function couldn't be found : " << dlerror();
87 throw VuoException(errorMessage.str());
88 }
89
90 vuoInstanceTriggerStart = (vuoInstanceTriggerStartType) dlsym(compositionBinaryHandle, "vuoInstanceTriggerStart");
91 if (! vuoInstanceTriggerStart)
92 {
93 errorMessage << "The composition couldn't be started because its vuoInstanceTriggerStart() function couldn't be found : " << dlerror();
94 throw VuoException(errorMessage.str());
95 }
96
97 vuoInstanceTriggerStop = (vuoInstanceTriggerStopType) dlsym(compositionBinaryHandle, "vuoInstanceTriggerStop");
98 if (! vuoInstanceTriggerStop)
99 {
100 errorMessage << "The composition couldn't be started because its vuoInstanceTriggerStop() function couldn't be found : " << dlerror();
101 throw VuoException(errorMessage.str());
102 }
103
104 vuoSetInputPortValue = (vuoSetInputPortValueType) dlsym(compositionBinaryHandle, "vuoSetInputPortValue");
105 if (! vuoSetInputPortValue)
106 {
107 errorMessage << "The composition couldn't be started because its vuoSetInputPortValue() function couldn't be found : " << dlerror();
108 throw VuoException(errorMessage.str());
109 }
110
111 getPublishedInputPortCount = (getPublishedInputPortCountType) dlsym(compositionBinaryHandle, "getPublishedInputPortCount");
112 if (! getPublishedInputPortCount)
113 {
114 errorMessage << "The composition couldn't be started because its getPublishedInputPortCount() function couldn't be found : " << dlerror();
115 throw VuoException(errorMessage.str());
116 }
117
118 getPublishedOutputPortCount = (getPublishedOutputPortCountType) dlsym(compositionBinaryHandle, "getPublishedOutputPortCount");
119 if (! getPublishedOutputPortCount)
120 {
121 errorMessage << "The composition couldn't be started because its getPublishedOutputPortCount() function couldn't be found : " << dlerror();
122 throw VuoException(errorMessage.str());
123 }
124
125 getPublishedInputPortNames = (getPublishedInputPortNamesType) dlsym(compositionBinaryHandle, "getPublishedInputPortNames");
126 if (! getPublishedInputPortNames)
127 {
128 errorMessage << "The composition couldn't be started because its getPublishedInputPortNames() function couldn't be found : " << dlerror();
129 throw VuoException(errorMessage.str());
130 }
131
132 getPublishedOutputPortNames = (getPublishedOutputPortNamesType) dlsym(compositionBinaryHandle, "getPublishedOutputPortNames");
133 if (! getPublishedOutputPortNames)
134 {
135 errorMessage << "The composition couldn't be started because its getPublishedOutputPortNames() function couldn't be found : " << dlerror();
136 throw VuoException(errorMessage.str());
137 }
138
139 getPublishedInputPortTypes = (getPublishedInputPortTypesType) dlsym(compositionBinaryHandle, "getPublishedInputPortTypes");
140 if (! getPublishedInputPortTypes)
141 {
142 errorMessage << "The composition couldn't be started because its getPublishedInputPortTypes() function couldn't be found : " << dlerror();
143 throw VuoException(errorMessage.str());
144 }
145
146 getPublishedOutputPortTypes = (getPublishedOutputPortTypesType) dlsym(compositionBinaryHandle, "getPublishedOutputPortTypes");
147 if (! getPublishedOutputPortTypes)
148 {
149 errorMessage << "The composition couldn't be started because its getPublishedOutputPortTypes() function couldn't be found : " << dlerror();
150 throw VuoException(errorMessage.str());
151 }
152
153 getPublishedInputPortDetails = (getPublishedInputPortDetailsType) dlsym(compositionBinaryHandle, "getPublishedInputPortDetails");
154 if (! getPublishedInputPortDetails)
155 {
156 errorMessage << "The composition couldn't be started because its getPublishedInputPortDetails() function couldn't be found : " << dlerror();
157 throw VuoException(errorMessage.str());
158 }
159
160 getPublishedOutputPortDetails = (getPublishedOutputPortDetailsType) dlsym(compositionBinaryHandle, "getPublishedOutputPortDetails");
161 if (! getPublishedOutputPortDetails)
162 {
163 errorMessage << "The composition couldn't be started because its getPublishedOutputPortDetails() function couldn't be found : " << dlerror();
164 throw VuoException(errorMessage.str());
165 }
166
167 firePublishedInputPortEvent = (firePublishedInputPortEventType) dlsym(compositionBinaryHandle, "firePublishedInputPortEvent");
168 if (! firePublishedInputPortEvent)
169 {
170 errorMessage << "The composition couldn't be started because its firePublishedInputPortEvent() function couldn't be found : " << dlerror();
171 throw VuoException(errorMessage.str());
172 }
173
174 setPublishedInputPortValue = (setPublishedInputPortValueType) dlsym(compositionBinaryHandle, "setPublishedInputPortValue");
175 if (! setPublishedInputPortValue)
176 {
177 errorMessage << "The composition couldn't be started because its setPublishedInputPortValue() function couldn't be found : " << dlerror();
178 throw VuoException(errorMessage.str());
179 }
180
181 getPublishedInputPortValue = (getPublishedInputPortValueType) dlsym(compositionBinaryHandle, "getPublishedInputPortValue");
182 if (! getPublishedInputPortValue)
183 {
184 errorMessage << "The composition couldn't be started because its getPublishedInputPortValue() function couldn't be found : " << dlerror();
185 throw VuoException(errorMessage.str());
186 }
187
188 getPublishedOutputPortValue = (getPublishedOutputPortValueType) dlsym(compositionBinaryHandle, "getPublishedOutputPortValue");
189 if (! getPublishedOutputPortValue)
190 {
191 errorMessage << "The composition couldn't be started because its getPublishedOutputPortValue() function couldn't be found : " << dlerror();
192 throw VuoException(errorMessage.str());
193 }
194}
195
201void VuoRuntimeCommunicator::openConnection(void *_zmqContext, const char *controlURL, const char *telemetryURL, int runnerPipe)
202{
203 this->runnerPipe = runnerPipe;
204
206 {
207 _hasZmqConnection = true;
208 zmqContext = (_zmqContext ? _zmqContext : zmq_init(1));
209
210 zmqControl = zmq_socket(zmqContext,ZMQ_REP);
211 zmqTelemetry = zmq_socket(zmqContext,ZMQ_PUB);
212
213 bool error = false;
214 ostringstream errorMessage;
215
216 if(zmq_bind(zmqControl,controlURL))
217 {
218 errorMessage << "The composition couldn't start because it couldn't establish communication to be controlled by the runner : " << zmq_strerror(errno) << endl;
219 error = true;
220 }
221
222 zmqSelfReceive = zmq_socket(zmqContext, ZMQ_PAIR);
223 if (zmq_bind(zmqSelfReceive, "inproc://vuo-runtime-self") != 0)
224 {
225 errorMessage << "Couldn't bind self-receive socket: " << zmq_strerror(errno) << " " << errno << endl;
226 error = true;
227 }
228
229 zmqSelfSend = zmq_socket(zmqContext, ZMQ_PAIR);
230 if (zmq_connect(zmqSelfSend, "inproc://vuo-runtime-self") != 0)
231 {
232 errorMessage << "Couldn't connect self-send socket: " << zmq_strerror(errno) << " " << errno << endl;
233 error = true;
234 }
235
236 const int highWaterMark = 0; // no limit
237 if(zmq_setsockopt(zmqTelemetry,ZMQ_SNDHWM,&highWaterMark,sizeof(highWaterMark)))
238 {
239 errorMessage << "Couldn't set high-water mark on telemetry socket: " << zmq_strerror(errno) << " " << errno << endl;
240 error = true;
241 }
242
243 if(zmq_bind(zmqTelemetry,telemetryURL))
244 {
245 errorMessage << "The composition couldn't start because it couldn't establish communication to be listened to by the runner : " << zmq_strerror(errno) << endl;
246 error = true;
247 }
248
249 if (error)
250 {
251 zmq_close(zmqControl);
252 zmqControl = NULL;
253 zmq_close(zmqSelfSend);
254 zmqSelfSend = NULL;
255 zmq_close(zmqSelfReceive);
256 zmqSelfReceive = NULL;
257 zmq_close(zmqTelemetry);
258 zmqTelemetry = NULL;
259 throw VuoException(errorMessage.str());
260 }
261 }
262}
263
268{
269 if (! _hasZmqConnection)
270 return;
271
272 zmq_ctx_term(zmqContext);
273 zmqContext = NULL;
274
275 _hasZmqConnection = false;
276}
277
282{
283 return _hasZmqConnection;
284}
285
290{
291 return controlQueue;
292}
293
297void VuoRuntimeCommunicator::sendControlReply(enum VuoControlReply reply, zmq_msg_t *messages, unsigned int messageCount)
298{
299 vuoSend("VuoControl",zmqControl,reply,messages,messageCount,false,NULL);
300}
301
305void VuoRuntimeCommunicator::sendTelemetry(enum VuoTelemetry type, zmq_msg_t *messages, unsigned int messageCount)
306{
308 if (!zmqTelemetry)
309 return;
310
311 dispatch_sync(telemetryQueue, ^{
313 vuoSend("VuoTelemetry",zmqTelemetry,type,messages,messageCount,true,NULL);
314 });
315}
316
322void VuoRuntimeCommunicator::sendNodeExecutionStarted(const char *compositionIdentifier, const char *nodeIdentifier)
323{
324 if (! (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToEventTelemetry(compositionIdentifier)))
325 return;
326
327 zmq_msg_t messages[2];
328 vuoInitMessageWithString(&messages[0], compositionIdentifier);
329 vuoInitMessageWithString(&messages[1], nodeIdentifier);
330
331 sendTelemetry(VuoTelemetryNodeExecutionStarted, messages, 2);
332}
333
339void VuoRuntimeCommunicator::sendNodeExecutionFinished(const char *compositionIdentifier, const char *nodeIdentifier)
340{
341 if (! (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToEventTelemetry(compositionIdentifier)))
342 return;
343
344 zmq_msg_t messages[2];
345 vuoInitMessageWithString(&messages[0], compositionIdentifier);
346 vuoInitMessageWithString(&messages[1], nodeIdentifier);
347
348 sendTelemetry(VuoTelemetryNodeExecutionFinished, messages, 2);
349}
350
356void VuoRuntimeCommunicator::sendInputPortsUpdated(const char *compositionIdentifier, const char *portIdentifier, bool receivedEvent, bool receivedData, const char *portDataSummary)
357{
358 bool isSendingAllTelemetry = isSubscribedToAllTelemetry(compositionIdentifier);
359 bool isSendingPortTelemetry = isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier);
360 if (! (isSendingAllTelemetry || isSubscribedToEventTelemetry(compositionIdentifier) || isSendingPortTelemetry))
361 return;
362
363 zmq_msg_t messages[5];
364 vuoInitMessageWithString(&messages[0], compositionIdentifier);
365 vuoInitMessageWithString(&messages[1], portIdentifier);
366 vuoInitMessageWithBool(&messages[2], receivedEvent);
367 vuoInitMessageWithBool(&messages[3], receivedData);
368 vuoInitMessageWithString(&messages[4], (portDataSummary && (isSendingAllTelemetry || isSendingPortTelemetry)) ? portDataSummary : "");
369
370 sendTelemetry(VuoTelemetryInputPortsUpdated, messages, 5);
371}
372
378void VuoRuntimeCommunicator::sendOutputPortsUpdated(const char *compositionIdentifier, const char *portIdentifier, bool sentEvent, bool sentData, const char *portDataSummary)
379{
380 bool isSendingAllTelemetry = isSubscribedToAllTelemetry(compositionIdentifier);
381 bool isSendingPortTelemetry = isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier);
382 if (! (isSendingAllTelemetry || isSubscribedToEventTelemetry(compositionIdentifier) || isSendingPortTelemetry))
383 return;
384
385 zmq_msg_t messages[5];
386 vuoInitMessageWithString(&messages[0], compositionIdentifier);
387 vuoInitMessageWithString(&messages[1], portIdentifier);
388 vuoInitMessageWithBool(&messages[2], sentEvent);
389 vuoInitMessageWithBool(&messages[3], sentData);
390 vuoInitMessageWithString(&messages[4], (portDataSummary && (isSendingAllTelemetry || isSendingPortTelemetry)) ? portDataSummary : "");
391
392 sendTelemetry(VuoTelemetryOutputPortsUpdated, messages, 5);
393}
394
398void VuoRuntimeCommunicator::sendPublishedOutputPortsUpdated(const char *portIdentifier, bool sentData, const char *portDataSummary)
399{
400 zmq_msg_t messages[3];
401 vuoInitMessageWithString(&messages[0], portIdentifier);
402 vuoInitMessageWithBool(&messages[1], sentData);
403 vuoInitMessageWithString(&messages[2], (portDataSummary ? portDataSummary : ""));
404
405 sendTelemetry(VuoTelemetryPublishedOutputPortsUpdated, messages, 3);
406}
407
413void VuoRuntimeCommunicator::sendEventFinished(unsigned long eventId, NodeContext *compositionContext)
414{
415 if (! vuoFinishedExecutingEvent(compositionContext, eventId))
416 return;
417
418 sendTelemetry(VuoTelemetryEventFinished, NULL, 0);
419}
420
426void VuoRuntimeCommunicator::sendEventDropped(const char *compositionIdentifier, const char *portIdentifier)
427{
428 bool isSendingPortTelemetry = isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier);
429 if (! (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToEventTelemetry(compositionIdentifier) || isSendingPortTelemetry))
430 return;
431
432 zmq_msg_t messages[2];
433 vuoInitMessageWithString(&messages[0], compositionIdentifier);
434 vuoInitMessageWithString(&messages[1], portIdentifier);
435
436 sendTelemetry(VuoTelemetryEventDropped, messages, 2);
437}
438
442void VuoRuntimeCommunicator::sendError(const char *message)
443{
444 if (! _hasZmqConnection)
445 return;
446
447 zmq_msg_t messages[1];
448 vuoInitMessageWithString(&messages[0], message);
449
450 sendTelemetry(VuoTelemetryError, messages, 1);
451}
452
457{
458 sendTelemetry(VuoTelemetryStopRequested, NULL, 0);
459}
460
466{
467 sendControlReply(VuoControlReplyCompositionStopping, NULL, 0);
468 zmq_close(zmqControl); // wait until message fully sends
469 zmqControl = NULL;
470}
471
475void VuoRuntimeCommunicator::subscribeToPortDataTelemetry(const char *compositionIdentifier, const char *portIdentifer)
476{
477 portsSendingDataTelemetry[compositionIdentifier].insert(portIdentifer);
478}
479
483void VuoRuntimeCommunicator::unsubscribeFromPortDataTelemetry(const char *compositionIdentifier, const char *portIdentifer)
484{
485 map<string, set<string> >::iterator iter1 = portsSendingDataTelemetry.find(compositionIdentifier);
486 if (iter1 != portsSendingDataTelemetry.end())
487 {
488 set<string>::iterator iter2 = iter1->second.find(portIdentifer);
489 if (iter2 != iter1->second.end())
490 {
491 iter1->second.erase(iter2);
492 if (iter1->second.empty())
493 portsSendingDataTelemetry.erase(iter1);
494 }
495 }
496}
497
501bool VuoRuntimeCommunicator::isSubscribedToPortDataTelemetry(const char *compositionIdentifier, const char *portIdentifer)
502{
503 map<string, set<string> >::iterator iter = portsSendingDataTelemetry.find(compositionIdentifier);
504 if (iter != portsSendingDataTelemetry.end())
505 return (iter->second.find(portIdentifer) != iter->second.end());
506
507 return false;
508}
509
513void VuoRuntimeCommunicator::subscribeToEventTelemetry(const char *compositionIdentifier)
514{
515 compositionsSendingEventTelemetry.insert(compositionIdentifier);
516}
517
521void VuoRuntimeCommunicator::unsubscribeFromEventTelemetry(const char *compositionIdentifier)
522{
523 set<string>::iterator iter = compositionsSendingEventTelemetry.find(compositionIdentifier);
524 if (iter != compositionsSendingEventTelemetry.end())
525 compositionsSendingEventTelemetry.erase(iter);
526}
527
531bool VuoRuntimeCommunicator::isSubscribedToEventTelemetry(const char *compositionIdentifier)
532{
533 set<string>::iterator iter = compositionsSendingEventTelemetry.find(compositionIdentifier);
534 return (iter != compositionsSendingEventTelemetry.end());
535}
536
540void VuoRuntimeCommunicator::subscribeToAllTelemetry(const char *compositionIdentifier)
541{
542 compositionsSendingAllTelemetry.insert(compositionIdentifier);
543}
544
548void VuoRuntimeCommunicator::unsubscribeFromAllTelemetry(const char *compositionIdentifier)
549{
550 set<string>::iterator iter = compositionsSendingAllTelemetry.find(compositionIdentifier);
551 if (iter != compositionsSendingAllTelemetry.end())
552 compositionsSendingAllTelemetry.erase(iter);
553}
554
558bool VuoRuntimeCommunicator::isSubscribedToAllTelemetry(const char *compositionIdentifier)
559{
560 set<string>::iterator iter = compositionsSendingAllTelemetry.find(compositionIdentifier);
561 return (iter != compositionsSendingAllTelemetry.end());
562}
563
569bool VuoRuntimeCommunicator::shouldSendPortDataTelemetry(const char *compositionIdentifier, const char *portIdentifier)
570{
571 return (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier));
572}
573
579char * VuoRuntimeCommunicator::mergeEnumDetails(string type, const char *details)
580{
581 string allowedValuesFunctionName = type + "_getAllowedValues";
582 typedef void *(*allowedValuesFunctionType)(void);
583 allowedValuesFunctionType allowedValuesFunction = (allowedValuesFunctionType)dlsym(RTLD_SELF, allowedValuesFunctionName.c_str());
584 if (!allowedValuesFunction)
585 return NULL;
586
587 string getJsonFunctionName = type + "_getJson";
588 typedef json_object *(*getJsonFunctionType)(int64_t);
589 getJsonFunctionType getJsonFunction = (getJsonFunctionType)dlsym(RTLD_SELF, getJsonFunctionName.c_str());
590 if (!getJsonFunction)
591 return NULL;
592
593 string summaryFunctionName = type + "_getSummary";
594 typedef char *(*summaryFunctionType)(int64_t);
595 summaryFunctionType summaryFunction = (summaryFunctionType)dlsym(RTLD_SELF, summaryFunctionName.c_str());
596 if (!summaryFunction)
597 return NULL;
598
599 string listCountFunctionName = "VuoListGetCount_" + type;
600 typedef unsigned long (*listCountFunctionType)(void *);
601 listCountFunctionType listCountFunction = (listCountFunctionType)dlsym(RTLD_SELF, listCountFunctionName.c_str());
602 if (!listCountFunction)
603 return NULL;
604
605 string listValueFunctionName = "VuoListGetValue_" + type;
606 typedef int64_t (*listValueFunctionType)(void *, unsigned long);
607 listValueFunctionType listValueFunction = (listValueFunctionType)dlsym(RTLD_SELF, listValueFunctionName.c_str());
608 if (!listValueFunction)
609 return NULL;
610
611 json_object *detailsJson = json_tokener_parse(details);
612 if (!detailsJson)
613 return NULL;
614
615 void *allowedValues = allowedValuesFunction();
616 VuoRetain(allowedValues);
617 unsigned long listCount = listCountFunction(allowedValues);
618 json_object *menuItems = json_object_new_array();
619 for (unsigned long i = 1; i <= listCount; ++i)
620 {
621 int64_t value = listValueFunction(allowedValues, i);
622 json_object *js = getJsonFunction(value);
623 if (!json_object_is_type(js, json_type_string))
624 continue;
625 const char *key = json_object_get_string(js);
626 char *summary = summaryFunction(value);
627
628 json_object *menuItem = json_object_new_object();
629 json_object_object_add(menuItem, "value", json_object_new_string(key));
630 json_object_object_add(menuItem, "name", json_object_new_string(summary));
631 json_object_array_add(menuItems, menuItem);
632
633 free(summary);
634 }
635 VuoRelease(allowedValues);
636
637 if (json_object_array_length(menuItems))
638 json_object_object_add(detailsJson, "menuItems", menuItems);
639
640 char *newDetails = strdup(json_object_to_json_string(detailsJson));
641 json_object_put(detailsJson);
642
643 return newDetails;
644}
645
646void VuoRuntimeCommunicator::sendHeartbeat(bool blocking)
647{
648 if (blocking)
649 // When called with blocking=true, we're already on telemetryQueue.
650 vuoSend("VuoTelemetry", zmqTelemetry, VuoTelemetryHeartbeat, nullptr, 0, false, nullptr);
651 else
652 sendTelemetry(VuoTelemetryHeartbeat, nullptr, 0);
653}
654
659{
660 if (! zmqTelemetry)
661 return;
662
663 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0);
664 telemetryTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, queue);
665 dispatch_source_set_timer(telemetryTimer, dispatch_walltime(NULL, 0), NSEC_PER_SEC/1000, NSEC_PER_SEC/1000);
666
667 dispatch_source_set_event_handler(telemetryTimer, ^{
668 sendHeartbeat();
669 });
670
671 dispatch_source_set_cancel_handler(telemetryTimer, ^{
672 dispatch_semaphore_signal(telemetryCanceled);
673 });
674
675 dispatch_resume(telemetryTimer);
676}
677
682{
683 if (! zmqTelemetry)
684 return;
685
686 dispatch_source_cancel(telemetryTimer);
687 dispatch_semaphore_wait(telemetryCanceled, DISPATCH_TIME_FOREVER);
688 dispatch_sync(telemetryQueue, ^{
689 // zmq_close calls POSIX close(), whose documentation says "queued data are discarded".
690 // Since telemetry uses non-blocking sends, issue one final _blocking_ send
691 // to ensure that the queue is drained before we close.
692 sendHeartbeat(true);
693
694 zmq_close(zmqTelemetry);
695 zmqTelemetry = NULL;
696 });
697
698 dispatch_release(telemetryTimer);
699 telemetryTimer = NULL;
700}
701
706{
707 if (! zmqControl)
708 return;
709
710 controlTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, controlQueue);
711 dispatch_source_set_timer(controlTimer, dispatch_walltime(NULL,0), NSEC_PER_SEC/1000, NSEC_PER_SEC/1000);
712
713 dispatch_source_set_event_handler(controlTimer, ^{
714
716
717 zmq_pollitem_t items[]=
718 {
719 {zmqControl,0,ZMQ_POLLIN,0},
720 {zmqSelfReceive,0,ZMQ_POLLIN,0},
721 };
722 int itemCount = 2;
723 long timeout = -1; // Wait forever (VuoStopComposition will send a message ZMQSelfReceive when it's time to stop).
724 zmq_poll(items,itemCount,timeout);
725 if(!(items[0].revents & ZMQ_POLLIN))
726 return;
727
728 enum VuoControlRequest control = (enum VuoControlRequest) vuoReceiveInt(zmqControl, NULL);
729 VuoCompositionState compositionState = { (void *)persistentState->runtimeState, "" };
730
731 switch (control)
732 {
734 {
735 dispatch_source_set_timer(telemetryTimer, dispatch_walltime(NULL, 0), NSEC_PER_SEC/2, NSEC_PER_SEC/100);
736
737 sendControlReply(VuoControlReplyHeartbeatSlowed,NULL,0);
738 break;
739 }
741 {
742 int timeoutInSeconds = vuoReceiveInt(zmqControl, NULL);
743 bool isBeingReplaced = vuoReceiveBool(zmqControl, NULL);
744
745 persistentState->runtimeState->stopCompositionAsOrderedByRunner(isBeingReplaced, timeoutInSeconds);
746
747 sendControlReply(VuoControlReplyCompositionStopping,NULL,0);
748 break;
749 }
751 {
752 persistentState->runtimeState->pauseComposition();
753
754 sendControlReply(VuoControlReplyCompositionPaused,NULL,0);
755 break;
756 }
758 {
759 persistentState->runtimeState->unpauseComposition();
760
761 sendControlReply(VuoControlReplyCompositionUnpaused,NULL,0);
762 break;
763 }
766 {
767 bool shouldUseInterprocessSerialization = vuoReceiveBool(zmqControl, NULL);
768 char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
769 char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
770
771 compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
772 char *valueAsString = persistentState->nodeRegistry->getPortValue(&compositionState, portIdentifier, shouldUseInterprocessSerialization);
773
774 zmq_msg_t messages[1];
775 vuoInitMessageWithString(&messages[0], valueAsString);
776 sendControlReply(control == VuoControlRequestInputPortValueRetrieve ?
778 messages,1);
779
780 free(compositionIdentifier);
781 free(portIdentifier);
782 free(valueAsString);
783 break;
784 }
787 {
788 char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
789 char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
790
791 compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
792 char *summary = persistentState->nodeRegistry->getPortSummary(&compositionState, portIdentifier);
793
794 zmq_msg_t messages[1];
795 vuoInitMessageWithString(&messages[0], summary);
796 sendControlReply(control == VuoControlRequestInputPortSummaryRetrieve ?
798 messages,1);
799
800 free(compositionIdentifier);
801 free(portIdentifier);
802 free(summary);
803 break;
804 }
806 {
807 char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
808 char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
809
810 compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
811 persistentState->nodeRegistry->fireTriggerPortEvent(&compositionState, portIdentifier);
812
813 sendControlReply(VuoControlReplyTriggerPortFiredEvent,NULL,0);
814
815 free(compositionIdentifier);
816 free(portIdentifier);
817 break;
818 }
820 {
821 char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
822 char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
823 char *valueAsString = vuoReceiveAndCopyString(zmqControl, NULL);
824
825 compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
826 persistentState->nodeRegistry->setPortValue(&compositionState, portIdentifier, valueAsString);
827
828 sendControlReply(VuoControlReplyInputPortValueModified,NULL,0);
829
830 free(compositionIdentifier);
831 free(portIdentifier);
832 free(valueAsString);
833 break;
834 }
836 {
837 int count = getPublishedInputPortCount();
838 char **names = getPublishedInputPortNames();
839
840 zmq_msg_t messages[count];
841 for (int i = 0; i < count; ++i)
842 vuoInitMessageWithString(&messages[i], names[i]);
843
844 sendControlReply(VuoControlReplyPublishedInputPortNamesRetrieved,messages,count);
845 break;
846 }
848 {
849 int count = getPublishedOutputPortCount();
850 char **names = getPublishedOutputPortNames();
851
852 zmq_msg_t messages[count];
853 for (int i = 0; i < count; ++i)
854 vuoInitMessageWithString(&messages[i], names[i]);
855
856 sendControlReply(VuoControlReplyPublishedOutputPortNamesRetrieved,messages,count);
857 break;
858 }
860 {
861 int count = getPublishedInputPortCount();
862 char **names = getPublishedInputPortTypes();
863
864 zmq_msg_t messages[count];
865 for (int i = 0; i < count; ++i)
866 vuoInitMessageWithString(&messages[i], names[i]);
867
868 sendControlReply(VuoControlReplyPublishedInputPortTypesRetrieved,messages,count);
869 break;
870 }
872 {
873 int count = getPublishedOutputPortCount();
874 char **names = getPublishedOutputPortTypes();
875
876 zmq_msg_t messages[count];
877 for (int i = 0; i < count; ++i)
878 vuoInitMessageWithString(&messages[i], names[i]);
879
880 sendControlReply(VuoControlReplyPublishedOutputPortTypesRetrieved,messages,count);
881 break;
882 }
884 {
885 int count = getPublishedInputPortCount();
886 char **types = getPublishedInputPortTypes();
887 char **names = getPublishedInputPortDetails();
888
889 zmq_msg_t messages[count];
890 for (int i = 0; i < count; ++i)
891 {
892 char *newDetails = mergeEnumDetails(types[i], names[i]);
893 if (newDetails)
894 names[i] = newDetails;
895 vuoInitMessageWithString(&messages[i], names[i]);
896 if (newDetails)
897 free(newDetails);
898 }
899
900 sendControlReply(VuoControlReplyPublishedInputPortDetailsRetrieved,messages,count);
901 break;
902 }
904 {
905 int count = getPublishedOutputPortCount();
906 char **names = getPublishedOutputPortDetails();
907
908 zmq_msg_t messages[count];
909 for (int i = 0; i < count; ++i)
910 vuoInitMessageWithString(&messages[i], names[i]);
911
912 sendControlReply(VuoControlReplyPublishedOutputPortDetailsRetrieved,messages,count);
913 break;
914 }
916 {
917 int count = vuoReceiveInt(zmqControl, NULL);
918
919 char **names = (char **)malloc(count * sizeof(char *));
920 for (int i = 0; i < count; ++i)
921 names[i] = vuoReceiveAndCopyString(zmqControl, NULL);
922
923 firePublishedInputPortEvent(names, count);
924
925 for (int i = 0; i < count; ++i)
926 free(names[i]);
927 free(names);
928
929 sendControlReply(VuoControlReplyPublishedInputPortFiredEvent,NULL,0);
930 break;
931 }
933 {
935
936 while (VuoTelemetry_hasMoreToReceive(zmqControl))
937 {
938 char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
939 char *valueAsString = vuoReceiveAndCopyString(zmqControl, NULL);
940
941 setPublishedInputPortValue(portIdentifier, valueAsString);
942
943 free(portIdentifier);
944 free(valueAsString);
945 }
946
947 sendControlReply(VuoControlReplyPublishedInputPortValueModified,NULL,0);
948
950 break;
951 }
953 {
955
956 bool shouldUseInterprocessSerialization = vuoReceiveBool(zmqControl, NULL);
957 char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
958 char *valueAsString = getPublishedInputPortValue(portIdentifier, shouldUseInterprocessSerialization);
959 free(portIdentifier);
960 zmq_msg_t messages[1];
961 vuoInitMessageWithString(&messages[0], valueAsString);
962 free(valueAsString);
963 sendControlReply(VuoControlReplyPublishedInputPortValueRetrieved,messages,1);
964
966 break;
967 }
969 {
971
972 bool shouldUseInterprocessSerialization = vuoReceiveBool(zmqControl, NULL);
973 char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
974 char *valueAsString = getPublishedOutputPortValue(portIdentifier, shouldUseInterprocessSerialization);
975 free(portIdentifier);
976 zmq_msg_t messages[1];
977 vuoInitMessageWithString(&messages[0], valueAsString);
978 free(valueAsString);
979 sendControlReply(VuoControlReplyPublishedOutputPortValueRetrieved,messages,1);
980
982 break;
983 }
986 {
987 char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
988 char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
989
990 compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
991 subscribeToPortDataTelemetry(compositionState.compositionIdentifier, portIdentifier);
992
993 char *summary = persistentState->nodeRegistry->getPortSummary(&compositionState, portIdentifier);
994
995 zmq_msg_t messages[1];
996 vuoInitMessageWithString(&messages[0], summary);
997 sendControlReply(control == VuoControlRequestInputPortTelemetrySubscribe ?
999 messages,1);
1000
1001 free(compositionIdentifier);
1002 free(portIdentifier);
1003 free(summary);
1004 break;
1005 }
1008 {
1009 char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1010 char *portIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1011
1012 compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
1013 unsubscribeFromPortDataTelemetry(compositionState.compositionIdentifier, portIdentifier);
1014
1015 sendControlReply(control == VuoControlRequestInputPortTelemetryUnsubscribe ?
1017 NULL,0);
1018
1019 free(compositionIdentifier);
1020 free(portIdentifier);
1021 break;
1022 }
1024 {
1025 char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1026
1027 compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
1028 subscribeToEventTelemetry(compositionState.compositionIdentifier);
1029
1030 sendControlReply(VuoControlReplyEventTelemetrySubscribed,NULL,0);
1031
1032 free(compositionIdentifier);
1033 break;
1034 }
1036 {
1037 char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1038
1039 compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
1040 unsubscribeFromEventTelemetry(compositionState.compositionIdentifier);
1041
1042 sendControlReply(VuoControlReplyEventTelemetryUnsubscribed,NULL,0);
1043
1044 free(compositionIdentifier);
1045 break;
1046 }
1048 {
1049 char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1050
1051 compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
1052 subscribeToAllTelemetry(compositionState.compositionIdentifier);
1053
1054 sendControlReply(VuoControlReplyAllTelemetrySubscribed,NULL,0);
1055
1056 free(compositionIdentifier);
1057 break;
1058 }
1060 {
1061 char *compositionIdentifier = vuoReceiveAndCopyString(zmqControl, NULL);
1062
1063 compositionState.compositionIdentifier = persistentState->nodeRegistry->defaultToTopLevelCompositionIdentifier(compositionIdentifier);
1064 unsubscribeFromAllTelemetry(compositionState.compositionIdentifier);
1065
1066 sendControlReply(VuoControlReplyAllTelemetryUnsubscribed,NULL,0);
1067
1068 free(compositionIdentifier);
1069 break;
1070 }
1071 }
1072 });
1073
1074 dispatch_source_set_cancel_handler(controlTimer, ^{
1075
1076 persistentState->runtimeState->breakOutOfEventLoop();
1077 dispatch_semaphore_signal(controlCanceled);
1078
1079 });
1080
1081 dispatch_resume(controlTimer);
1082}
1083
1088{
1089 if (! zmqControl)
1090 return;
1091
1092 dispatch_source_cancel(controlTimer);
1093}
1094
1099{
1100 if (! zmqSelfSend)
1101 return;
1102
1103 char z = 0;
1104 zmq_msg_t message;
1105 zmq_msg_init_size(&message, sizeof z);
1106 memcpy(zmq_msg_data(&message), &z, sizeof z);
1107 if (zmq_msg_send(&message, static_cast<zmq_msg_t *>(zmqSelfSend), 0) == -1)
1108 VUserLog("Couldn't break: %s (%d)", zmq_strerror(errno), errno);
1109 zmq_msg_close(&message);
1110}
1111
1116{
1117 if (! zmqControl)
1118 return;
1119
1120 dispatch_semaphore_wait(controlCanceled, DISPATCH_TIME_FOREVER);
1121 dispatch_sync(controlQueue, ^{
1122 zmq_close(zmqControl);
1123 zmqControl = NULL;
1124 });
1125
1126 dispatch_release(controlTimer);
1127 controlTimer = NULL;
1128
1129 if (zmqSelfSend)
1130 {
1131 zmq_close(zmqSelfSend);
1132 zmqSelfSend = NULL;
1133 }
1134
1135 if (zmqSelfReceive)
1136 {
1137 zmq_close(zmqSelfReceive);
1138 zmqSelfReceive = NULL;
1139 }
1140}
1141
1147{
1148 if (runnerPipe < 0)
1149 return;
1150
1151 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
1152 dispatch_async(queue, ^{
1153 char buf[1];
1154 int ret;
1155 int readError;
1156 do {
1157 ret = read(runnerPipe, &buf, 1);
1158 readError = errno;
1159 usleep(USEC_PER_SEC / 100);
1160 } while (ret == -1 && readError == EAGAIN);
1161
1162 _hasZmqConnection = false;
1164 });
1165}
1166
1167extern "C"
1168{
1172void vuoSendNodeExecutionStarted(VuoCompositionState *compositionState, const char *nodeIdentifier)
1173{
1175 return runtimeState->persistentState->communicator->sendNodeExecutionStarted(compositionState->compositionIdentifier, nodeIdentifier);
1176}
1177
1181void vuoSendNodeExecutionFinished(VuoCompositionState *compositionState, const char *nodeIdentifier)
1182{
1185}
1186
1190void vuoSendInputPortsUpdated(VuoCompositionState *compositionState, const char *portIdentifier, bool receivedEvent, bool receivedData, const char *portDataSummary)
1191{
1193 return runtimeState->persistentState->communicator->sendInputPortsUpdated(compositionState->compositionIdentifier, portIdentifier, receivedEvent, receivedData, portDataSummary);
1194}
1195
1201void vuoSendOutputPortsUpdated(VuoCompositionState *compositionState, const char *portIdentifier, bool sentEvent, bool sentData, const char *portDataSummary)
1202{
1204 return runtimeState->persistentState->communicator->sendOutputPortsUpdated(compositionState->compositionIdentifier, portIdentifier, sentEvent, sentData, portDataSummary);
1205}
1206
1210void vuoSendPublishedOutputPortsUpdated(VuoCompositionState *compositionState, const char *portIdentifier, bool sentData, const char *portDataSummary)
1211{
1213 return runtimeState->persistentState->communicator->sendPublishedOutputPortsUpdated(portIdentifier, sentData, portDataSummary);
1214}
1215
1221void vuoSendEventFinished(VuoCompositionState *compositionState, unsigned long eventId)
1222{
1225 return runtimeState->persistentState->communicator->sendEventFinished(eventId, compositionContext);
1226}
1227
1231void vuoSendEventDropped(VuoCompositionState *compositionState, const char *portIdentifier)
1232{
1234 return runtimeState->persistentState->communicator->sendEventDropped(compositionState->compositionIdentifier, portIdentifier);
1235}
1236
1240void vuoSendError(VuoCompositionState *compositionState, const char *message)
1241{
1244}
1245
1249bool vuoShouldSendPortDataTelemetry(VuoCompositionState *compositionState, const char *portIdentifier)
1250{
1253}
1254
1258char * vuoGetInputPortString(VuoCompositionState *compositionState, const char *portIdentifier, bool shouldUseInterprocessSerialization)
1259{
1261 return runtimeState->persistentState->nodeRegistry->getPortValue(compositionState, portIdentifier, shouldUseInterprocessSerialization);
1262}
1263
1267char * vuoGetOutputPortString(VuoCompositionState *compositionState, const char *portIdentifier, bool shouldUseInterprocessSerialization)
1268{
1270 return runtimeState->persistentState->nodeRegistry->getPortValue(compositionState, portIdentifier, shouldUseInterprocessSerialization);
1271}
1272
1273} // extern "C"