26 this->persistentState = persistentState;
28 _hasZmqConnection =
false;
32 zmqSelfReceive = NULL;
36 controlQueue = dispatch_queue_create(
"org.vuo.runtime.control", NULL);
37 telemetryQueue = dispatch_queue_create(
"org.vuo.runtime.telemetry", NULL);
39 telemetryTimer = NULL;
40 controlCanceled = dispatch_semaphore_create(0);
41 telemetryCanceled = dispatch_semaphore_create(0);
45 vuoInstanceInit = NULL;
46 vuoInstanceTriggerStart = NULL;
47 vuoInstanceTriggerStop = NULL;
48 vuoSetInputPortValue = NULL;
49 getPublishedInputPortCount = NULL;
50 getPublishedOutputPortCount = NULL;
51 getPublishedInputPortNames = NULL;
52 getPublishedOutputPortNames = NULL;
53 getPublishedInputPortTypes = NULL;
54 getPublishedOutputPortTypes = NULL;
55 getPublishedInputPortDetails = NULL;
56 getPublishedOutputPortDetails = NULL;
57 firePublishedInputPortEvent = NULL;
58 setPublishedInputPortValue = NULL;
59 getPublishedInputPortValue = NULL;
60 getPublishedOutputPortValue = NULL;
68 dispatch_release(controlQueue);
69 dispatch_release(telemetryQueue);
70 dispatch_release(controlCanceled);
71 dispatch_release(telemetryCanceled);
81 ostringstream errorMessage;
83 vuoInstanceInit = (vuoInstanceInitType) dlsym(compositionBinaryHandle,
"vuoInstanceInit");
84 if (! vuoInstanceInit)
86 errorMessage <<
"The composition couldn't be started because its vuoInstanceInit() function couldn't be found : " << dlerror();
90 vuoInstanceTriggerStart = (vuoInstanceTriggerStartType) dlsym(compositionBinaryHandle,
"vuoInstanceTriggerStart");
91 if (! vuoInstanceTriggerStart)
93 errorMessage <<
"The composition couldn't be started because its vuoInstanceTriggerStart() function couldn't be found : " << dlerror();
97 vuoInstanceTriggerStop = (vuoInstanceTriggerStopType) dlsym(compositionBinaryHandle,
"vuoInstanceTriggerStop");
98 if (! vuoInstanceTriggerStop)
100 errorMessage <<
"The composition couldn't be started because its vuoInstanceTriggerStop() function couldn't be found : " << dlerror();
104 vuoSetInputPortValue = (vuoSetInputPortValueType) dlsym(compositionBinaryHandle,
"vuoSetInputPortValue");
105 if (! vuoSetInputPortValue)
107 errorMessage <<
"The composition couldn't be started because its vuoSetInputPortValue() function couldn't be found : " << dlerror();
111 getPublishedInputPortCount = (getPublishedInputPortCountType) dlsym(compositionBinaryHandle,
"getPublishedInputPortCount");
112 if (! getPublishedInputPortCount)
114 errorMessage <<
"The composition couldn't be started because its getPublishedInputPortCount() function couldn't be found : " << dlerror();
118 getPublishedOutputPortCount = (getPublishedOutputPortCountType) dlsym(compositionBinaryHandle,
"getPublishedOutputPortCount");
119 if (! getPublishedOutputPortCount)
121 errorMessage <<
"The composition couldn't be started because its getPublishedOutputPortCount() function couldn't be found : " << dlerror();
125 getPublishedInputPortNames = (getPublishedInputPortNamesType) dlsym(compositionBinaryHandle,
"getPublishedInputPortNames");
126 if (! getPublishedInputPortNames)
128 errorMessage <<
"The composition couldn't be started because its getPublishedInputPortNames() function couldn't be found : " << dlerror();
132 getPublishedOutputPortNames = (getPublishedOutputPortNamesType) dlsym(compositionBinaryHandle,
"getPublishedOutputPortNames");
133 if (! getPublishedOutputPortNames)
135 errorMessage <<
"The composition couldn't be started because its getPublishedOutputPortNames() function couldn't be found : " << dlerror();
139 getPublishedInputPortTypes = (getPublishedInputPortTypesType) dlsym(compositionBinaryHandle,
"getPublishedInputPortTypes");
140 if (! getPublishedInputPortTypes)
142 errorMessage <<
"The composition couldn't be started because its getPublishedInputPortTypes() function couldn't be found : " << dlerror();
146 getPublishedOutputPortTypes = (getPublishedOutputPortTypesType) dlsym(compositionBinaryHandle,
"getPublishedOutputPortTypes");
147 if (! getPublishedOutputPortTypes)
149 errorMessage <<
"The composition couldn't be started because its getPublishedOutputPortTypes() function couldn't be found : " << dlerror();
153 getPublishedInputPortDetails = (getPublishedInputPortDetailsType) dlsym(compositionBinaryHandle,
"getPublishedInputPortDetails");
154 if (! getPublishedInputPortDetails)
156 errorMessage <<
"The composition couldn't be started because its getPublishedInputPortDetails() function couldn't be found : " << dlerror();
160 getPublishedOutputPortDetails = (getPublishedOutputPortDetailsType) dlsym(compositionBinaryHandle,
"getPublishedOutputPortDetails");
161 if (! getPublishedOutputPortDetails)
163 errorMessage <<
"The composition couldn't be started because its getPublishedOutputPortDetails() function couldn't be found : " << dlerror();
167 firePublishedInputPortEvent = (firePublishedInputPortEventType) dlsym(compositionBinaryHandle,
"firePublishedInputPortEvent");
168 if (! firePublishedInputPortEvent)
170 errorMessage <<
"The composition couldn't be started because its firePublishedInputPortEvent() function couldn't be found : " << dlerror();
174 setPublishedInputPortValue = (setPublishedInputPortValueType) dlsym(compositionBinaryHandle,
"setPublishedInputPortValue");
175 if (! setPublishedInputPortValue)
177 errorMessage <<
"The composition couldn't be started because its setPublishedInputPortValue() function couldn't be found : " << dlerror();
181 getPublishedInputPortValue = (getPublishedInputPortValueType) dlsym(compositionBinaryHandle,
"getPublishedInputPortValue");
182 if (! getPublishedInputPortValue)
184 errorMessage <<
"The composition couldn't be started because its getPublishedInputPortValue() function couldn't be found : " << dlerror();
188 getPublishedOutputPortValue = (getPublishedOutputPortValueType) dlsym(compositionBinaryHandle,
"getPublishedOutputPortValue");
189 if (! getPublishedOutputPortValue)
191 errorMessage <<
"The composition couldn't be started because its getPublishedOutputPortValue() function couldn't be found : " << dlerror();
207 _hasZmqConnection =
true;
208 zmqContext = (_zmqContext ? _zmqContext : zmq_init(1));
210 zmqControl = zmq_socket(zmqContext,ZMQ_REP);
211 zmqTelemetry = zmq_socket(zmqContext,ZMQ_PUB);
214 ostringstream errorMessage;
218 errorMessage <<
"The composition couldn't start because it couldn't establish communication to be controlled by the runner : " << zmq_strerror(errno) << endl;
222 zmqSelfReceive = zmq_socket(zmqContext, ZMQ_PAIR);
223 if (zmq_bind(zmqSelfReceive,
"inproc://vuo-runtime-self") != 0)
225 errorMessage <<
"Couldn't bind self-receive socket: " << zmq_strerror(errno) <<
" " << errno << endl;
229 zmqSelfSend = zmq_socket(zmqContext, ZMQ_PAIR);
230 if (zmq_connect(zmqSelfSend,
"inproc://vuo-runtime-self") != 0)
232 errorMessage <<
"Couldn't connect self-send socket: " << zmq_strerror(errno) <<
" " << errno << endl;
236 const int highWaterMark = 0;
237 if(zmq_setsockopt(zmqTelemetry,ZMQ_SNDHWM,&highWaterMark,
sizeof(highWaterMark)))
239 errorMessage <<
"Couldn't set high-water mark on telemetry socket: " << zmq_strerror(errno) <<
" " << errno << endl;
245 errorMessage <<
"The composition couldn't start because it couldn't establish communication to be listened to by the runner : " << zmq_strerror(errno) << endl;
251 zmq_close(zmqControl);
253 zmq_close(zmqSelfSend);
255 zmq_close(zmqSelfReceive);
256 zmqSelfReceive = NULL;
257 zmq_close(zmqTelemetry);
269 if (! _hasZmqConnection)
272 zmq_ctx_term(zmqContext);
275 _hasZmqConnection =
false;
283 return _hasZmqConnection;
297void VuoRuntimeCommunicator::sendControlReply(
enum VuoControlReply reply, zmq_msg_t *messages,
unsigned int messageCount)
299 vuoSend(
"VuoControl",zmqControl,reply,messages,messageCount,
false,NULL);
305void VuoRuntimeCommunicator::sendTelemetry(
enum VuoTelemetry type, zmq_msg_t *messages,
unsigned int messageCount)
311 dispatch_sync(telemetryQueue, ^{
313 vuoSend(
"VuoTelemetry",zmqTelemetry,type,messages,messageCount,
true,NULL);
324 if (! (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToEventTelemetry(compositionIdentifier)))
327 zmq_msg_t messages[2];
341 if (! (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToEventTelemetry(compositionIdentifier)))
344 zmq_msg_t messages[2];
358 bool isSendingAllTelemetry = isSubscribedToAllTelemetry(compositionIdentifier);
359 bool isSendingPortTelemetry = isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier);
360 if (! (isSendingAllTelemetry || isSubscribedToEventTelemetry(compositionIdentifier) || isSendingPortTelemetry))
363 zmq_msg_t messages[5];
368 vuoInitMessageWithString(&messages[4], (portDataSummary && (isSendingAllTelemetry || isSendingPortTelemetry)) ? portDataSummary :
"");
380 bool isSendingAllTelemetry = isSubscribedToAllTelemetry(compositionIdentifier);
381 bool isSendingPortTelemetry = isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier);
382 if (! (isSendingAllTelemetry || isSubscribedToEventTelemetry(compositionIdentifier) || isSendingPortTelemetry))
385 zmq_msg_t messages[5];
390 vuoInitMessageWithString(&messages[4], (portDataSummary && (isSendingAllTelemetry || isSendingPortTelemetry)) ? portDataSummary :
"");
400 zmq_msg_t messages[3];
428 bool isSendingPortTelemetry = isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier);
429 if (! (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToEventTelemetry(compositionIdentifier) || isSendingPortTelemetry))
432 zmq_msg_t messages[2];
444 if (! _hasZmqConnection)
447 zmq_msg_t messages[1];
468 zmq_close(zmqControl);
475void VuoRuntimeCommunicator::subscribeToPortDataTelemetry(
const char *compositionIdentifier,
const char *portIdentifer)
477 portsSendingDataTelemetry[compositionIdentifier].insert(portIdentifer);
483void VuoRuntimeCommunicator::unsubscribeFromPortDataTelemetry(
const char *compositionIdentifier,
const char *portIdentifer)
485 map<string, set<string> >::iterator iter1 = portsSendingDataTelemetry.find(compositionIdentifier);
486 if (iter1 != portsSendingDataTelemetry.end())
488 set<string>::iterator iter2 = iter1->second.find(portIdentifer);
489 if (iter2 != iter1->second.end())
491 iter1->second.erase(iter2);
492 if (iter1->second.empty())
493 portsSendingDataTelemetry.erase(iter1);
501bool VuoRuntimeCommunicator::isSubscribedToPortDataTelemetry(
const char *compositionIdentifier,
const char *portIdentifer)
503 map<string, set<string> >::iterator iter = portsSendingDataTelemetry.find(compositionIdentifier);
504 if (iter != portsSendingDataTelemetry.end())
505 return (iter->second.find(portIdentifer) != iter->second.end());
513void VuoRuntimeCommunicator::subscribeToEventTelemetry(
const char *compositionIdentifier)
515 compositionsSendingEventTelemetry.insert(compositionIdentifier);
521void VuoRuntimeCommunicator::unsubscribeFromEventTelemetry(
const char *compositionIdentifier)
523 set<string>::iterator iter = compositionsSendingEventTelemetry.find(compositionIdentifier);
524 if (iter != compositionsSendingEventTelemetry.end())
525 compositionsSendingEventTelemetry.erase(iter);
531bool VuoRuntimeCommunicator::isSubscribedToEventTelemetry(
const char *compositionIdentifier)
533 set<string>::iterator iter = compositionsSendingEventTelemetry.find(compositionIdentifier);
534 return (iter != compositionsSendingEventTelemetry.end());
540void VuoRuntimeCommunicator::subscribeToAllTelemetry(
const char *compositionIdentifier)
542 compositionsSendingAllTelemetry.insert(compositionIdentifier);
548void VuoRuntimeCommunicator::unsubscribeFromAllTelemetry(
const char *compositionIdentifier)
550 set<string>::iterator iter = compositionsSendingAllTelemetry.find(compositionIdentifier);
551 if (iter != compositionsSendingAllTelemetry.end())
552 compositionsSendingAllTelemetry.erase(iter);
558bool VuoRuntimeCommunicator::isSubscribedToAllTelemetry(
const char *compositionIdentifier)
560 set<string>::iterator iter = compositionsSendingAllTelemetry.find(compositionIdentifier);
561 return (iter != compositionsSendingAllTelemetry.end());
571 return (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier));
579char * VuoRuntimeCommunicator::mergeEnumDetails(
string type,
const char *details)
581 string allowedValuesFunctionName = type +
"_getAllowedValues";
582 typedef void *(*allowedValuesFunctionType)(void);
583 allowedValuesFunctionType allowedValuesFunction = (allowedValuesFunctionType)dlsym(RTLD_SELF, allowedValuesFunctionName.c_str());
584 if (!allowedValuesFunction)
587 string getJsonFunctionName = type +
"_getJson";
588 typedef json_object *(*getJsonFunctionType)(int64_t);
589 getJsonFunctionType getJsonFunction = (getJsonFunctionType)dlsym(RTLD_SELF, getJsonFunctionName.c_str());
590 if (!getJsonFunction)
593 string summaryFunctionName = type +
"_getSummary";
594 typedef char *(*summaryFunctionType)(int64_t);
595 summaryFunctionType summaryFunction = (summaryFunctionType)dlsym(RTLD_SELF, summaryFunctionName.c_str());
596 if (!summaryFunction)
599 string listCountFunctionName =
"VuoListGetCount_" + type;
600 typedef unsigned long (*listCountFunctionType)(
void *);
601 listCountFunctionType listCountFunction = (listCountFunctionType)dlsym(RTLD_SELF, listCountFunctionName.c_str());
602 if (!listCountFunction)
605 string listValueFunctionName =
"VuoListGetValue_" + type;
606 typedef int64_t (*listValueFunctionType)(
void *,
unsigned long);
607 listValueFunctionType listValueFunction = (listValueFunctionType)dlsym(RTLD_SELF, listValueFunctionName.c_str());
608 if (!listValueFunction)
611 json_object *detailsJson = json_tokener_parse(details);
615 void *allowedValues = allowedValuesFunction();
617 unsigned long listCount = listCountFunction(allowedValues);
618 json_object *menuItems = json_object_new_array();
619 for (
unsigned long i = 1; i <= listCount; ++i)
621 int64_t value = listValueFunction(allowedValues, i);
622 json_object *js = getJsonFunction(value);
623 if (!json_object_is_type(js, json_type_string))
625 const char *key = json_object_get_string(js);
626 char *summary = summaryFunction(value);
628 json_object *menuItem = json_object_new_object();
629 json_object_object_add(menuItem,
"value", json_object_new_string(key));
630 json_object_object_add(menuItem,
"name", json_object_new_string(summary));
631 json_object_array_add(menuItems, menuItem);
637 if (json_object_array_length(menuItems))
638 json_object_object_add(detailsJson,
"menuItems", menuItems);
640 char *newDetails = strdup(json_object_to_json_string(detailsJson));
641 json_object_put(detailsJson);
646void VuoRuntimeCommunicator::sendHeartbeat(
bool blocking)
663 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0);
664 telemetryTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, queue);
665 dispatch_source_set_timer(telemetryTimer, dispatch_walltime(NULL, 0), NSEC_PER_SEC/1000, NSEC_PER_SEC/1000);
667 dispatch_source_set_event_handler(telemetryTimer, ^{
671 dispatch_source_set_cancel_handler(telemetryTimer, ^{
672 dispatch_semaphore_signal(telemetryCanceled);
675 dispatch_resume(telemetryTimer);
686 dispatch_source_cancel(telemetryTimer);
687 dispatch_semaphore_wait(telemetryCanceled, DISPATCH_TIME_FOREVER);
688 dispatch_sync(telemetryQueue, ^{
694 zmq_close(zmqTelemetry);
698 dispatch_release(telemetryTimer);
699 telemetryTimer = NULL;
710 controlTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, controlQueue);
711 dispatch_source_set_timer(controlTimer, dispatch_walltime(NULL,0), NSEC_PER_SEC/1000, NSEC_PER_SEC/1000);
713 dispatch_source_set_event_handler(controlTimer, ^{
717 zmq_pollitem_t items[]=
719 {zmqControl,0,ZMQ_POLLIN,0},
720 {zmqSelfReceive,0,ZMQ_POLLIN,0},
724 zmq_poll(items,itemCount,timeout);
725 if(!(items[0].revents & ZMQ_POLLIN))
735 dispatch_source_set_timer(telemetryTimer, dispatch_walltime(NULL, 0), NSEC_PER_SEC/2, NSEC_PER_SEC/100);
767 bool shouldUseInterprocessSerialization =
vuoReceiveBool(zmqControl, NULL);
772 char *valueAsString = persistentState->
nodeRegistry->
getPortValue(&compositionState, portIdentifier, shouldUseInterprocessSerialization);
774 zmq_msg_t messages[1];
780 free(compositionIdentifier);
781 free(portIdentifier);
794 zmq_msg_t messages[1];
800 free(compositionIdentifier);
801 free(portIdentifier);
815 free(compositionIdentifier);
816 free(portIdentifier);
830 free(compositionIdentifier);
831 free(portIdentifier);
837 int count = getPublishedInputPortCount();
838 char **names = getPublishedInputPortNames();
840 zmq_msg_t messages[count];
841 for (
int i = 0; i < count; ++i)
849 int count = getPublishedOutputPortCount();
850 char **names = getPublishedOutputPortNames();
852 zmq_msg_t messages[count];
853 for (
int i = 0; i < count; ++i)
861 int count = getPublishedInputPortCount();
862 char **names = getPublishedInputPortTypes();
864 zmq_msg_t messages[count];
865 for (
int i = 0; i < count; ++i)
873 int count = getPublishedOutputPortCount();
874 char **names = getPublishedOutputPortTypes();
876 zmq_msg_t messages[count];
877 for (
int i = 0; i < count; ++i)
885 int count = getPublishedInputPortCount();
886 char **types = getPublishedInputPortTypes();
887 char **names = getPublishedInputPortDetails();
889 zmq_msg_t messages[count];
890 for (
int i = 0; i < count; ++i)
892 char *newDetails = mergeEnumDetails(types[i], names[i]);
894 names[i] = newDetails;
905 int count = getPublishedOutputPortCount();
906 char **names = getPublishedOutputPortDetails();
908 zmq_msg_t messages[count];
909 for (
int i = 0; i < count; ++i)
919 char **names = (
char **)malloc(count *
sizeof(
char *));
920 for (
int i = 0; i < count; ++i)
923 firePublishedInputPortEvent(names, count);
925 for (
int i = 0; i < count; ++i)
941 setPublishedInputPortValue(portIdentifier, valueAsString);
943 free(portIdentifier);
956 bool shouldUseInterprocessSerialization =
vuoReceiveBool(zmqControl, NULL);
958 char *valueAsString = getPublishedInputPortValue(portIdentifier, shouldUseInterprocessSerialization);
959 free(portIdentifier);
960 zmq_msg_t messages[1];
972 bool shouldUseInterprocessSerialization =
vuoReceiveBool(zmqControl, NULL);
974 char *valueAsString = getPublishedOutputPortValue(portIdentifier, shouldUseInterprocessSerialization);
975 free(portIdentifier);
976 zmq_msg_t messages[1];
995 zmq_msg_t messages[1];
1001 free(compositionIdentifier);
1002 free(portIdentifier);
1019 free(compositionIdentifier);
1020 free(portIdentifier);
1032 free(compositionIdentifier);
1044 free(compositionIdentifier);
1056 free(compositionIdentifier);
1068 free(compositionIdentifier);
1074 dispatch_source_set_cancel_handler(controlTimer, ^{
1077 dispatch_semaphore_signal(controlCanceled);
1081 dispatch_resume(controlTimer);
1092 dispatch_source_cancel(controlTimer);
1105 zmq_msg_init_size(&message,
sizeof z);
1106 memcpy(zmq_msg_data(&message), &z,
sizeof z);
1107 if (zmq_msg_send(&message,
static_cast<zmq_msg_t *
>(zmqSelfSend), 0) == -1)
1108 VUserLog(
"Couldn't break: %s (%d)", zmq_strerror(errno), errno);
1109 zmq_msg_close(&message);
1120 dispatch_semaphore_wait(controlCanceled, DISPATCH_TIME_FOREVER);
1121 dispatch_sync(controlQueue, ^{
1122 zmq_close(zmqControl);
1126 dispatch_release(controlTimer);
1127 controlTimer = NULL;
1131 zmq_close(zmqSelfSend);
1137 zmq_close(zmqSelfReceive);
1138 zmqSelfReceive = NULL;
1151 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
1152 dispatch_async(queue, ^{
1157 ret = read(runnerPipe, &buf, 1);
1159 usleep(USEC_PER_SEC / 100);
1160 }
while (ret == -1 && readError == EAGAIN);
1162 _hasZmqConnection =
false;