26 this->persistentState = persistentState;
28 _hasZmqConnection =
false;
32 zmqSelfReceive = NULL;
36 controlQueue = dispatch_queue_create(
"org.vuo.runtime.control", NULL);
37 telemetryQueue = dispatch_queue_create(
"org.vuo.runtime.telemetry", NULL);
39 telemetryTimer = NULL;
40 controlCanceled = dispatch_semaphore_create(0);
41 telemetryCanceled = dispatch_semaphore_create(0);
45 vuoInstanceInit = NULL;
46 vuoInstanceTriggerStart = NULL;
47 vuoInstanceTriggerStop = NULL;
48 vuoSetInputPortValue = NULL;
49 getPublishedInputPortCount = NULL;
50 getPublishedOutputPortCount = NULL;
51 getPublishedInputPortNames = NULL;
52 getPublishedOutputPortNames = NULL;
53 getPublishedInputPortTypes = NULL;
54 getPublishedOutputPortTypes = NULL;
55 getPublishedInputPortDetails = NULL;
56 getPublishedOutputPortDetails = NULL;
57 firePublishedInputPortEvent = NULL;
58 setPublishedInputPortValue = NULL;
59 getPublishedInputPortValue = NULL;
60 getPublishedOutputPortValue = NULL;
68 dispatch_release(controlQueue);
69 dispatch_release(telemetryQueue);
70 dispatch_release(controlCanceled);
71 dispatch_release(telemetryCanceled);
81 ostringstream errorMessage;
83 vuoInstanceInit = (vuoInstanceInitType) dlsym(compositionBinaryHandle,
"vuoInstanceInit");
84 if (! vuoInstanceInit)
86 errorMessage <<
"The composition couldn't be started because its vuoInstanceInit() function couldn't be found : " << dlerror();
90 vuoInstanceTriggerStart = (vuoInstanceTriggerStartType) dlsym(compositionBinaryHandle,
"vuoInstanceTriggerStart");
91 if (! vuoInstanceTriggerStart)
93 errorMessage <<
"The composition couldn't be started because its vuoInstanceTriggerStart() function couldn't be found : " << dlerror();
97 vuoInstanceTriggerStop = (vuoInstanceTriggerStopType) dlsym(compositionBinaryHandle,
"vuoInstanceTriggerStop");
98 if (! vuoInstanceTriggerStop)
100 errorMessage <<
"The composition couldn't be started because its vuoInstanceTriggerStop() function couldn't be found : " << dlerror();
104 vuoSetInputPortValue = (vuoSetInputPortValueType) dlsym(compositionBinaryHandle,
"vuoSetInputPortValue");
105 if (! vuoSetInputPortValue)
107 errorMessage <<
"The composition couldn't be started because its vuoSetInputPortValue() function couldn't be found : " << dlerror();
111 getPublishedInputPortCount = (getPublishedInputPortCountType) dlsym(compositionBinaryHandle,
"getPublishedInputPortCount");
112 if (! getPublishedInputPortCount)
114 errorMessage <<
"The composition couldn't be started because its getPublishedInputPortCount() function couldn't be found : " << dlerror();
118 getPublishedOutputPortCount = (getPublishedOutputPortCountType) dlsym(compositionBinaryHandle,
"getPublishedOutputPortCount");
119 if (! getPublishedOutputPortCount)
121 errorMessage <<
"The composition couldn't be started because its getPublishedOutputPortCount() function couldn't be found : " << dlerror();
125 getPublishedInputPortNames = (getPublishedInputPortNamesType) dlsym(compositionBinaryHandle,
"getPublishedInputPortNames");
126 if (! getPublishedInputPortNames)
128 errorMessage <<
"The composition couldn't be started because its getPublishedInputPortNames() function couldn't be found : " << dlerror();
132 getPublishedOutputPortNames = (getPublishedOutputPortNamesType) dlsym(compositionBinaryHandle,
"getPublishedOutputPortNames");
133 if (! getPublishedOutputPortNames)
135 errorMessage <<
"The composition couldn't be started because its getPublishedOutputPortNames() function couldn't be found : " << dlerror();
139 getPublishedInputPortTypes = (getPublishedInputPortTypesType) dlsym(compositionBinaryHandle,
"getPublishedInputPortTypes");
140 if (! getPublishedInputPortTypes)
142 errorMessage <<
"The composition couldn't be started because its getPublishedInputPortTypes() function couldn't be found : " << dlerror();
146 getPublishedOutputPortTypes = (getPublishedOutputPortTypesType) dlsym(compositionBinaryHandle,
"getPublishedOutputPortTypes");
147 if (! getPublishedOutputPortTypes)
149 errorMessage <<
"The composition couldn't be started because its getPublishedOutputPortTypes() function couldn't be found : " << dlerror();
153 getPublishedInputPortDetails = (getPublishedInputPortDetailsType) dlsym(compositionBinaryHandle,
"getPublishedInputPortDetails");
154 if (! getPublishedInputPortDetails)
156 errorMessage <<
"The composition couldn't be started because its getPublishedInputPortDetails() function couldn't be found : " << dlerror();
160 getPublishedOutputPortDetails = (getPublishedOutputPortDetailsType) dlsym(compositionBinaryHandle,
"getPublishedOutputPortDetails");
161 if (! getPublishedOutputPortDetails)
163 errorMessage <<
"The composition couldn't be started because its getPublishedOutputPortDetails() function couldn't be found : " << dlerror();
167 firePublishedInputPortEvent = (firePublishedInputPortEventType) dlsym(compositionBinaryHandle,
"firePublishedInputPortEvent");
168 if (! firePublishedInputPortEvent)
170 errorMessage <<
"The composition couldn't be started because its firePublishedInputPortEvent() function couldn't be found : " << dlerror();
174 setPublishedInputPortValue = (setPublishedInputPortValueType) dlsym(compositionBinaryHandle,
"setPublishedInputPortValue");
175 if (! setPublishedInputPortValue)
177 errorMessage <<
"The composition couldn't be started because its setPublishedInputPortValue() function couldn't be found : " << dlerror();
181 getPublishedInputPortValue = (getPublishedInputPortValueType) dlsym(compositionBinaryHandle,
"getPublishedInputPortValue");
182 if (! getPublishedInputPortValue)
184 errorMessage <<
"The composition couldn't be started because its getPublishedInputPortValue() function couldn't be found : " << dlerror();
188 getPublishedOutputPortValue = (getPublishedOutputPortValueType) dlsym(compositionBinaryHandle,
"getPublishedOutputPortValue");
189 if (! getPublishedOutputPortValue)
191 errorMessage <<
"The composition couldn't be started because its getPublishedOutputPortValue() function couldn't be found : " << dlerror();
207 _hasZmqConnection =
true;
208 zmqContext = (_zmqContext ? _zmqContext : zmq_init(1));
210 zmqControl = zmq_socket(zmqContext,ZMQ_REP);
211 zmqTelemetry = zmq_socket(zmqContext,ZMQ_PUB);
214 ostringstream errorMessage;
218 errorMessage <<
"The composition couldn't start because it couldn't establish communication to be controlled by the runner : " << zmq_strerror(errno) << endl;
222 zmqSelfReceive = zmq_socket(zmqContext, ZMQ_PAIR);
223 if (zmq_bind(zmqSelfReceive,
"inproc://vuo-runtime-self") != 0)
225 errorMessage <<
"Couldn't bind self-receive socket: " << zmq_strerror(errno) <<
" " << errno << endl;
229 zmqSelfSend = zmq_socket(zmqContext, ZMQ_PAIR);
230 if (zmq_connect(zmqSelfSend,
"inproc://vuo-runtime-self") != 0)
232 errorMessage <<
"Couldn't connect self-send socket: " << zmq_strerror(errno) <<
" " << errno << endl;
236 const int highWaterMark = 0;
237 if(zmq_setsockopt(zmqTelemetry,ZMQ_SNDHWM,&highWaterMark,
sizeof(highWaterMark)))
239 errorMessage <<
"Couldn't set high-water mark on telemetry socket: " << zmq_strerror(errno) <<
" " << errno << endl;
245 errorMessage <<
"The composition couldn't start because it couldn't establish communication to be listened to by the runner : " << zmq_strerror(errno) << endl;
251 zmq_close(zmqControl);
253 zmq_close(zmqSelfSend);
255 zmq_close(zmqSelfReceive);
256 zmqSelfReceive = NULL;
257 zmq_close(zmqTelemetry);
269 if (! _hasZmqConnection)
272 zmq_term(zmqContext);
275 _hasZmqConnection =
false;
283 return _hasZmqConnection;
297 void VuoRuntimeCommunicator::sendControlReply(
enum VuoControlReply reply, zmq_msg_t *messages,
unsigned int messageCount)
299 vuoSend(
"VuoControl",zmqControl,reply,messages,messageCount,
false,NULL);
305 void VuoRuntimeCommunicator::sendTelemetry(
enum VuoTelemetry type, zmq_msg_t *messages,
unsigned int messageCount)
311 dispatch_sync(telemetryQueue, ^{
313 vuoSend(
"VuoTelemetry",zmqTelemetry,type,messages,messageCount,
true,NULL);
324 if (! (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToEventTelemetry(compositionIdentifier)))
327 zmq_msg_t messages[2];
341 if (! (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToEventTelemetry(compositionIdentifier)))
344 zmq_msg_t messages[2];
358 bool isSendingAllTelemetry = isSubscribedToAllTelemetry(compositionIdentifier);
359 bool isSendingPortTelemetry = isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier);
360 if (! (isSendingAllTelemetry || isSubscribedToEventTelemetry(compositionIdentifier) || isSendingPortTelemetry))
363 zmq_msg_t messages[5];
368 vuoInitMessageWithString(&messages[4], (portDataSummary && (isSendingAllTelemetry || isSendingPortTelemetry)) ? portDataSummary :
"");
380 bool isSendingAllTelemetry = isSubscribedToAllTelemetry(compositionIdentifier);
381 bool isSendingPortTelemetry = isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier);
382 if (! (isSendingAllTelemetry || isSubscribedToEventTelemetry(compositionIdentifier) || isSendingPortTelemetry))
385 zmq_msg_t messages[5];
390 vuoInitMessageWithString(&messages[4], (portDataSummary && (isSendingAllTelemetry || isSendingPortTelemetry)) ? portDataSummary :
"");
400 zmq_msg_t messages[3];
428 bool isSendingPortTelemetry = isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier);
429 if (! (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToEventTelemetry(compositionIdentifier) || isSendingPortTelemetry))
432 zmq_msg_t messages[2];
444 if (! _hasZmqConnection)
447 zmq_msg_t messages[1];
468 zmq_close(zmqControl);
475 void VuoRuntimeCommunicator::subscribeToPortDataTelemetry(
const char *compositionIdentifier,
const char *portIdentifer)
477 portsSendingDataTelemetry[compositionIdentifier].insert(portIdentifer);
483 void VuoRuntimeCommunicator::unsubscribeFromPortDataTelemetry(
const char *compositionIdentifier,
const char *portIdentifer)
485 map<string, set<string> >::iterator iter1 = portsSendingDataTelemetry.find(compositionIdentifier);
486 if (iter1 != portsSendingDataTelemetry.end())
488 set<string>::iterator iter2 = iter1->second.find(portIdentifer);
489 if (iter2 != iter1->second.end())
491 iter1->second.erase(iter2);
492 if (iter1->second.empty())
493 portsSendingDataTelemetry.erase(iter1);
501 bool VuoRuntimeCommunicator::isSubscribedToPortDataTelemetry(
const char *compositionIdentifier,
const char *portIdentifer)
503 map<string, set<string> >::iterator iter = portsSendingDataTelemetry.find(compositionIdentifier);
504 if (iter != portsSendingDataTelemetry.end())
505 return (iter->second.find(portIdentifer) != iter->second.end());
513 void VuoRuntimeCommunicator::subscribeToEventTelemetry(
const char *compositionIdentifier)
515 compositionsSendingEventTelemetry.insert(compositionIdentifier);
521 void VuoRuntimeCommunicator::unsubscribeFromEventTelemetry(
const char *compositionIdentifier)
523 set<string>::iterator iter = compositionsSendingEventTelemetry.find(compositionIdentifier);
524 if (iter != compositionsSendingEventTelemetry.end())
525 compositionsSendingEventTelemetry.erase(iter);
531 bool VuoRuntimeCommunicator::isSubscribedToEventTelemetry(
const char *compositionIdentifier)
533 set<string>::iterator iter = compositionsSendingEventTelemetry.find(compositionIdentifier);
534 return (iter != compositionsSendingEventTelemetry.end());
540 void VuoRuntimeCommunicator::subscribeToAllTelemetry(
const char *compositionIdentifier)
542 compositionsSendingAllTelemetry.insert(compositionIdentifier);
548 void VuoRuntimeCommunicator::unsubscribeFromAllTelemetry(
const char *compositionIdentifier)
550 set<string>::iterator iter = compositionsSendingAllTelemetry.find(compositionIdentifier);
551 if (iter != compositionsSendingAllTelemetry.end())
552 compositionsSendingAllTelemetry.erase(iter);
558 bool VuoRuntimeCommunicator::isSubscribedToAllTelemetry(
const char *compositionIdentifier)
560 set<string>::iterator iter = compositionsSendingAllTelemetry.find(compositionIdentifier);
561 return (iter != compositionsSendingAllTelemetry.end());
571 return (isSubscribedToAllTelemetry(compositionIdentifier) || isSubscribedToPortDataTelemetry(compositionIdentifier, portIdentifier));
579 char * VuoRuntimeCommunicator::mergeEnumDetails(
string type,
const char *details)
581 string allowedValuesFunctionName = type +
"_getAllowedValues";
582 typedef void *(*allowedValuesFunctionType)(void);
583 allowedValuesFunctionType allowedValuesFunction = (allowedValuesFunctionType)dlsym(RTLD_SELF, allowedValuesFunctionName.c_str());
584 if (!allowedValuesFunction)
587 string getJsonFunctionName = type +
"_getJson";
588 typedef json_object *(*getJsonFunctionType)(int64_t);
589 getJsonFunctionType getJsonFunction = (getJsonFunctionType)dlsym(RTLD_SELF, getJsonFunctionName.c_str());
590 if (!getJsonFunction)
593 string summaryFunctionName = type +
"_getSummary";
594 typedef char *(*summaryFunctionType)(int64_t);
595 summaryFunctionType summaryFunction = (summaryFunctionType)dlsym(RTLD_SELF, summaryFunctionName.c_str());
596 if (!summaryFunction)
599 string listCountFunctionName =
"VuoListGetCount_" + type;
600 typedef unsigned long (*listCountFunctionType)(
void *);
601 listCountFunctionType listCountFunction = (listCountFunctionType)dlsym(RTLD_SELF, listCountFunctionName.c_str());
602 if (!listCountFunction)
605 string listValueFunctionName =
"VuoListGetValue_" + type;
606 typedef int64_t (*listValueFunctionType)(
void *,
unsigned long);
607 listValueFunctionType listValueFunction = (listValueFunctionType)dlsym(RTLD_SELF, listValueFunctionName.c_str());
608 if (!listValueFunction)
611 json_object *detailsJson = json_tokener_parse(details);
615 void *allowedValues = allowedValuesFunction();
617 unsigned long listCount = listCountFunction(allowedValues);
619 for (
unsigned long i = 1; i <= listCount; ++i)
621 int64_t value = listValueFunction(allowedValues, i);
623 if (!json_object_is_type(js, json_type_string))
625 const char *key = json_object_get_string(js);
626 char *summary = summaryFunction(value);
629 json_object_object_add(menuItem,
"value", json_object_new_string(key));
630 json_object_object_add(menuItem,
"name", json_object_new_string(summary));
631 json_object_array_add(menuItems, menuItem);
637 if (json_object_array_length(menuItems))
638 json_object_object_add(detailsJson,
"menuItems", menuItems);
640 char *newDetails = strdup(json_object_to_json_string(detailsJson));
641 json_object_put(detailsJson);
646 void VuoRuntimeCommunicator::sendHeartbeat(
bool blocking)
649 if(getrusage(RUSAGE_SELF,&r))
651 VUserLog(
"The composition couldn't get the information to send for VuoTelemetryStats : %s", strerror(errno));
655 zmq_msg_t messages[2];
658 uint64_t utime = r.ru_utime.tv_sec*USEC_PER_SEC+r.ru_utime.tv_usec;
659 zmq_msg_init_size(&messages[0],
sizeof utime);
660 memcpy(zmq_msg_data(&messages[0]), &utime,
sizeof utime);
664 uint64_t stime = r.ru_stime.tv_sec*USEC_PER_SEC+r.ru_stime.tv_usec;
665 zmq_msg_init_size(&messages[1],
sizeof stime);
666 memcpy(zmq_msg_data(&messages[1]), &stime,
sizeof stime);
684 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0);
685 telemetryTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, queue);
686 dispatch_source_set_timer(telemetryTimer, dispatch_walltime(NULL, 0), NSEC_PER_SEC/1000, NSEC_PER_SEC/1000);
688 dispatch_source_set_event_handler(telemetryTimer, ^{
692 dispatch_source_set_cancel_handler(telemetryTimer, ^{
693 dispatch_semaphore_signal(telemetryCanceled);
696 dispatch_resume(telemetryTimer);
707 dispatch_source_cancel(telemetryTimer);
708 dispatch_semaphore_wait(telemetryCanceled, DISPATCH_TIME_FOREVER);
709 dispatch_sync(telemetryQueue, ^{
715 zmq_close(zmqTelemetry);
719 dispatch_release(telemetryTimer);
720 telemetryTimer = NULL;
731 controlTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, controlQueue);
732 dispatch_source_set_timer(controlTimer, dispatch_walltime(NULL,0), NSEC_PER_SEC/1000, NSEC_PER_SEC/1000);
734 dispatch_source_set_event_handler(controlTimer, ^{
738 zmq_pollitem_t items[]=
740 {zmqControl,0,ZMQ_POLLIN,0},
741 {zmqSelfReceive,0,ZMQ_POLLIN,0},
745 zmq_poll(items,itemCount,timeout);
746 if(!(items[0].revents & ZMQ_POLLIN))
756 dispatch_source_set_timer(telemetryTimer, dispatch_walltime(NULL, 0), NSEC_PER_SEC/2, NSEC_PER_SEC/100);
789 bool shouldUseInterprocessSerialization =
vuoReceiveBool(zmqControl, NULL);
794 char *valueAsString = persistentState->
nodeRegistry->
getPortValue(&compositionState, portIdentifier, shouldUseInterprocessSerialization);
796 zmq_msg_t messages[1];
802 free(compositionIdentifier);
803 free(portIdentifier);
816 zmq_msg_t messages[1];
822 free(compositionIdentifier);
823 free(portIdentifier);
837 free(compositionIdentifier);
838 free(portIdentifier);
852 free(compositionIdentifier);
853 free(portIdentifier);
859 int count = getPublishedInputPortCount();
860 char **names = getPublishedInputPortNames();
862 zmq_msg_t messages[count];
863 for (
int i = 0; i < count; ++i)
871 int count = getPublishedOutputPortCount();
872 char **names = getPublishedOutputPortNames();
874 zmq_msg_t messages[count];
875 for (
int i = 0; i < count; ++i)
883 int count = getPublishedInputPortCount();
884 char **names = getPublishedInputPortTypes();
886 zmq_msg_t messages[count];
887 for (
int i = 0; i < count; ++i)
895 int count = getPublishedOutputPortCount();
896 char **names = getPublishedOutputPortTypes();
898 zmq_msg_t messages[count];
899 for (
int i = 0; i < count; ++i)
907 int count = getPublishedInputPortCount();
908 char **types = getPublishedInputPortTypes();
909 char **names = getPublishedInputPortDetails();
911 zmq_msg_t messages[count];
912 for (
int i = 0; i < count; ++i)
914 char *newDetails = mergeEnumDetails(types[i], names[i]);
916 names[i] = newDetails;
927 int count = getPublishedOutputPortCount();
928 char **names = getPublishedOutputPortDetails();
930 zmq_msg_t messages[count];
931 for (
int i = 0; i < count; ++i)
941 char **names = (
char **)malloc(count *
sizeof(
char *));
942 for (
int i = 0; i < count; ++i)
945 firePublishedInputPortEvent(names, count);
947 for (
int i = 0; i < count; ++i)
963 setPublishedInputPortValue(portIdentifier, valueAsString);
965 free(portIdentifier);
978 bool shouldUseInterprocessSerialization =
vuoReceiveBool(zmqControl, NULL);
980 char *valueAsString = getPublishedInputPortValue(portIdentifier, shouldUseInterprocessSerialization);
981 free(portIdentifier);
982 zmq_msg_t messages[1];
994 bool shouldUseInterprocessSerialization =
vuoReceiveBool(zmqControl, NULL);
996 char *valueAsString = getPublishedOutputPortValue(portIdentifier, shouldUseInterprocessSerialization);
997 free(portIdentifier);
998 zmq_msg_t messages[1];
1000 free(valueAsString);
1017 zmq_msg_t messages[1];
1023 free(compositionIdentifier);
1024 free(portIdentifier);
1041 free(compositionIdentifier);
1042 free(portIdentifier);
1054 free(compositionIdentifier);
1066 free(compositionIdentifier);
1078 free(compositionIdentifier);
1090 free(compositionIdentifier);
1096 dispatch_source_set_cancel_handler(controlTimer, ^{
1099 dispatch_semaphore_signal(controlCanceled);
1103 dispatch_resume(controlTimer);
1114 dispatch_source_cancel(controlTimer);
1127 zmq_msg_init_size(&message,
sizeof z);
1128 memcpy(zmq_msg_data(&message), &z,
sizeof z);
1129 if (zmq_msg_send(&message,
static_cast<zmq_msg_t *
>(zmqSelfSend), 0) == -1)
1130 VUserLog(
"Couldn't break: %s (%d)", zmq_strerror(errno), errno);
1131 zmq_msg_close(&message);
1142 dispatch_semaphore_wait(controlCanceled, DISPATCH_TIME_FOREVER);
1143 dispatch_sync(controlQueue, ^{
1144 zmq_close(zmqControl);
1148 dispatch_release(controlTimer);
1149 controlTimer = NULL;
1153 zmq_close(zmqSelfSend);
1159 zmq_close(zmqSelfReceive);
1160 zmqSelfReceive = NULL;
1173 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
1174 dispatch_async(queue, ^{
1181 usleep(USEC_PER_SEC / 100);
1182 }
while (ret == -1 && readError == EAGAIN);
1184 _hasZmqConnection =
false;