26 this->persistentState = persistentState;
28 _hasZmqConnection =
false;
32 zmqSelfReceive = NULL;
36 controlQueue = dispatch_queue_create(
"org.vuo.runtime.control", NULL);
37 telemetryQueue = dispatch_queue_create(
"org.vuo.runtime.telemetry", NULL);
39 telemetryTimer = NULL;
40 controlCanceled = dispatch_semaphore_create(0);
41 telemetryCanceled = dispatch_semaphore_create(0);
45 vuoInstanceInit = NULL;
46 vuoInstanceTriggerStart = NULL;
47 vuoInstanceTriggerStop = NULL;
48 vuoSetInputPortValue = NULL;
49 getPublishedInputPortCount = NULL;
50 getPublishedOutputPortCount = NULL;
51 getPublishedInputPortNames = NULL;
52 getPublishedOutputPortNames = NULL;
53 getPublishedInputPortTypes = NULL;
54 getPublishedOutputPortTypes = NULL;
55 getPublishedInputPortDetails = NULL;
56 getPublishedOutputPortDetails = NULL;
57 firePublishedInputPortEvent = NULL;
58 setPublishedInputPortValue = NULL;
59 getPublishedInputPortValue = NULL;
60 getPublishedOutputPortValue = NULL;
68 dispatch_release(controlQueue);
69 dispatch_release(telemetryQueue);
70 dispatch_release(controlCanceled);
71 dispatch_release(telemetryCanceled);
81 ostringstream errorMessage;
83 vuoInstanceInit = (vuoInstanceInitType) dlsym(compositionBinaryHandle,
"vuoInstanceInit");
84 if (! vuoInstanceInit)
86 errorMessage <<
"The composition couldn't be started because its vuoInstanceInit() function couldn't be found : " << dlerror();
90 vuoInstanceTriggerStart = (vuoInstanceTriggerStartType) dlsym(compositionBinaryHandle,
"vuoInstanceTriggerStart");
91 if (! vuoInstanceTriggerStart)
93 errorMessage <<
"The composition couldn't be started because its vuoInstanceTriggerStart() function couldn't be found : " << dlerror();
97 vuoInstanceTriggerStop = (vuoInstanceTriggerStopType) dlsym(compositionBinaryHandle,
"vuoInstanceTriggerStop");
98 if (! vuoInstanceTriggerStop)
100 errorMessage <<
"The composition couldn't be started because its vuoInstanceTriggerStop() function couldn't be found : " << dlerror();
104 vuoSetInputPortValue = (vuoSetInputPortValueType) dlsym(compositionBinaryHandle,
"vuoSetInputPortValue");
105 if (! vuoSetInputPortValue)
107 errorMessage <<
"The composition couldn't be started because its vuoSetInputPortValue() function couldn't be found : " << dlerror();
111 getPublishedInputPortCount = (getPublishedInputPortCountType) dlsym(compositionBinaryHandle,
"getPublishedInputPortCount");
112 if (! getPublishedInputPortCount)
114 errorMessage <<
"The composition couldn't be started because its getPublishedInputPortCount() function couldn't be found : " << dlerror();
118 getPublishedOutputPortCount = (getPublishedOutputPortCountType) dlsym(compositionBinaryHandle,
"getPublishedOutputPortCount");
119 if (! getPublishedOutputPortCount)
121 errorMessage <<
"The composition couldn't be started because its getPublishedOutputPortCount() function couldn't be found : " << dlerror();
125 getPublishedInputPortNames = (getPublishedInputPortNamesType) dlsym(compositionBinaryHandle,
"getPublishedInputPortNames");
126 if (! getPublishedInputPortNames)
128 errorMessage <<
"The composition couldn't be started because its getPublishedInputPortNames() function couldn't be found : " << dlerror();
132 getPublishedOutputPortNames = (getPublishedOutputPortNamesType) dlsym(compositionBinaryHandle,
"getPublishedOutputPortNames");
133 if (! getPublishedOutputPortNames)
135 errorMessage <<
"The composition couldn't be started because its getPublishedOutputPortNames() function couldn't be found : " << dlerror();
139 getPublishedInputPortTypes = (getPublishedInputPortTypesType) dlsym(compositionBinaryHandle,
"getPublishedInputPortTypes");
140 if (! getPublishedInputPortTypes)
142 errorMessage <<
"The composition couldn't be started because its getPublishedInputPortTypes() function couldn't be found : " << dlerror();
146 getPublishedOutputPortTypes = (getPublishedOutputPortTypesType) dlsym(compositionBinaryHandle,
"getPublishedOutputPortTypes");
147 if (! getPublishedOutputPortTypes)
149 errorMessage <<
"The composition couldn't be started because its getPublishedOutputPortTypes() function couldn't be found : " << dlerror();
153 getPublishedInputPortDetails = (getPublishedInputPortDetailsType) dlsym(compositionBinaryHandle,
"getPublishedInputPortDetails");
154 if (! getPublishedInputPortDetails)
156 errorMessage <<
"The composition couldn't be started because its getPublishedInputPortDetails() function couldn't be found : " << dlerror();
160 getPublishedOutputPortDetails = (getPublishedOutputPortDetailsType) dlsym(compositionBinaryHandle,
"getPublishedOutputPortDetails");
161 if (! getPublishedOutputPortDetails)
163 errorMessage <<
"The composition couldn't be started because its getPublishedOutputPortDetails() function couldn't be found : " << dlerror();
167 firePublishedInputPortEvent = (firePublishedInputPortEventType) dlsym(compositionBinaryHandle,
"firePublishedInputPortEvent");
168 if (! firePublishedInputPortEvent)
170 errorMessage <<
"The composition couldn't be started because its firePublishedInputPortEvent() function couldn't be found : " << dlerror();
174 setPublishedInputPortValue = (setPublishedInputPortValueType) dlsym(compositionBinaryHandle,
"setPublishedInputPortValue");
175 if (! setPublishedInputPortValue)
177 errorMessage <<
"The composition couldn't be started because its setPublishedInputPortValue() function couldn't be found : " << dlerror();
181 getPublishedInputPortValue = (getPublishedInputPortValueType) dlsym(compositionBinaryHandle,
"getPublishedInputPortValue");
182 if (! getPublishedInputPortValue)
184 errorMessage <<
"The composition couldn't be started because its getPublishedInputPortValue() function couldn't be found : " << dlerror();
188 getPublishedOutputPortValue = (getPublishedOutputPortValueType) dlsym(compositionBinaryHandle,
"getPublishedOutputPortValue");
189 if (! getPublishedOutputPortValue)
191 errorMessage <<
"The composition couldn't be started because its getPublishedOutputPortValue() function couldn't be found : " << dlerror();
207 _hasZmqConnection =
true;
208 zmqContext = (_zmqContext ? _zmqContext : zmq_init(1));
210 zmqControl = zmq_socket(zmqContext,ZMQ_REP);
211 zmqTelemetry = zmq_socket(zmqContext,ZMQ_PUB);
214 ostringstream errorMessage;
218 errorMessage <<
"The composition couldn't start because it couldn't establish communication to be controlled by the runner : " << zmq_strerror(errno) << endl;
222 zmqSelfReceive = zmq_socket(zmqContext, ZMQ_PAIR);
223 if (zmq_bind(zmqSelfReceive,
"inproc://vuo-runtime-self") != 0)
225 errorMessage <<
"Couldn't bind self-receive socket: " << zmq_strerror(errno) <<
" " << errno << endl;
229 zmqSelfSend = zmq_socket(zmqContext, ZMQ_PAIR);
230 if (zmq_connect(zmqSelfSend,
"inproc://vuo-runtime-self") != 0)
232 errorMessage <<
"Couldn't connect self-send socket: " << zmq_strerror(errno) <<
" " << errno << endl;
238 errorMessage <<
"The composition couldn't start because it couldn't establish communication to be listened to by the runner : " << zmq_strerror(errno) << endl;
244 zmq_close(zmqControl);
246 zmq_close(zmqSelfSend);
248 zmq_close(zmqSelfReceive);
249 zmqSelfReceive = NULL;
250 zmq_close(zmqTelemetry);
262 if (! _hasZmqConnection)
265 zmq_term(zmqContext);
268 _hasZmqConnection =
false;
276 return _hasZmqConnection;
290 void VuoRuntimeCommunicator::sendControlReply(
enum VuoControlReply reply, zmq_msg_t *messages,
unsigned int messageCount)
292 vuoSend(
"VuoControl",zmqControl,reply,messages,messageCount,
false,NULL);
298 void VuoRuntimeCommunicator::sendTelemetry(
enum VuoTelemetry type, zmq_msg_t *messages,
unsigned int messageCount)
304 dispatch_sync(telemetryQueue, ^{
306 vuoSend(
"VuoTelemetry",zmqTelemetry,type,messages,messageCount,
true,NULL);
320 zmq_msg_t messages[2];
337 zmq_msg_t messages[2];
352 bool isSendingPortTelemetry = isSubscribedToPortDataTelemetry(
compositionIdentifier, portIdentifier);
353 if (! (isSendingAllTelemetry || isSubscribedToEventTelemetry(
compositionIdentifier) || isSendingPortTelemetry))
356 zmq_msg_t messages[5];
361 vuoInitMessageWithString(&messages[4], (portDataSummary && (isSendingAllTelemetry || isSendingPortTelemetry)) ? portDataSummary :
"");
374 bool isSendingPortTelemetry = isSubscribedToPortDataTelemetry(
compositionIdentifier, portIdentifier);
375 if (! (isSendingAllTelemetry || isSubscribedToEventTelemetry(
compositionIdentifier) || isSendingPortTelemetry))
378 zmq_msg_t messages[5];
383 vuoInitMessageWithString(&messages[4], (portDataSummary && (isSendingAllTelemetry || isSendingPortTelemetry)) ? portDataSummary :
"");
393 zmq_msg_t messages[3];
421 bool isSendingPortTelemetry = isSubscribedToPortDataTelemetry(
compositionIdentifier, portIdentifier);
425 zmq_msg_t messages[2];
437 if (! _hasZmqConnection)
440 zmq_msg_t messages[1];
461 zmq_close(zmqControl);
468 void VuoRuntimeCommunicator::subscribeToPortDataTelemetry(
const char *
compositionIdentifier,
const char *portIdentifer)
476 void VuoRuntimeCommunicator::unsubscribeFromPortDataTelemetry(
const char *
compositionIdentifier,
const char *portIdentifer)
479 if (iter1 != portsSendingDataTelemetry.end())
481 set<string>::iterator iter2 = iter1->second.find(portIdentifer);
482 if (iter2 != iter1->second.end())
484 iter1->second.erase(iter2);
485 if (iter1->second.empty())
486 portsSendingDataTelemetry.erase(iter1);
494 bool VuoRuntimeCommunicator::isSubscribedToPortDataTelemetry(
const char *
compositionIdentifier,
const char *portIdentifer)
497 if (iter != portsSendingDataTelemetry.end())
498 return (iter->second.find(portIdentifer) != iter->second.end());
517 if (iter != compositionsSendingEventTelemetry.end())
518 compositionsSendingEventTelemetry.erase(iter);
527 return (iter != compositionsSendingEventTelemetry.end());
544 if (iter != compositionsSendingAllTelemetry.end())
545 compositionsSendingAllTelemetry.erase(iter);
554 return (iter != compositionsSendingAllTelemetry.end());
572 char * VuoRuntimeCommunicator::mergeEnumDetails(
string type,
const char *details)
574 string allowedValuesFunctionName = type +
"_getAllowedValues";
575 typedef void *(*allowedValuesFunctionType)(void);
576 allowedValuesFunctionType allowedValuesFunction = (allowedValuesFunctionType)dlsym(RTLD_SELF, allowedValuesFunctionName.c_str());
577 if (!allowedValuesFunction)
580 string getJsonFunctionName = type +
"_getJson";
581 typedef json_object *(*getJsonFunctionType)(int64_t);
582 getJsonFunctionType getJsonFunction = (getJsonFunctionType)dlsym(RTLD_SELF, getJsonFunctionName.c_str());
583 if (!getJsonFunction)
586 string summaryFunctionName = type +
"_getSummary";
587 typedef char *(*summaryFunctionType)(int64_t);
588 summaryFunctionType summaryFunction = (summaryFunctionType)dlsym(RTLD_SELF, summaryFunctionName.c_str());
589 if (!summaryFunction)
592 string listCountFunctionName =
"VuoListGetCount_" + type;
593 typedef unsigned long (*listCountFunctionType)(
void *);
594 listCountFunctionType listCountFunction = (listCountFunctionType)dlsym(RTLD_SELF, listCountFunctionName.c_str());
595 if (!listCountFunction)
598 string listValueFunctionName =
"VuoListGetValue_" + type;
599 typedef int64_t (*listValueFunctionType)(
void *,
unsigned long);
600 listValueFunctionType listValueFunction = (listValueFunctionType)dlsym(RTLD_SELF, listValueFunctionName.c_str());
601 if (!listValueFunction)
604 json_object *detailsJson = json_tokener_parse(details);
608 void *allowedValues = allowedValuesFunction();
610 unsigned long listCount = listCountFunction(allowedValues);
612 for (
unsigned long i = 1; i <= listCount; ++i)
614 int64_t value = listValueFunction(allowedValues, i);
616 if (!json_object_is_type(js, json_type_string))
618 const char *key = json_object_get_string(js);
619 char *summary = summaryFunction(value);
622 json_object_object_add(menuItem,
"value", json_object_new_string(key));
623 json_object_object_add(menuItem,
"name", json_object_new_string(summary));
624 json_object_array_add(menuItems, menuItem);
630 if (json_object_array_length(menuItems))
631 json_object_object_add(detailsJson,
"menuItems", menuItems);
633 char *newDetails = strdup(json_object_to_json_string(detailsJson));
634 json_object_put(detailsJson);
639 void VuoRuntimeCommunicator::sendHeartbeat(
bool blocking)
642 if(getrusage(RUSAGE_SELF,&r))
644 VUserLog(
"The composition couldn't get the information to send for VuoTelemetryStats : %s", strerror(errno));
648 zmq_msg_t messages[2];
651 uint64_t utime = r.ru_utime.tv_sec*USEC_PER_SEC+r.ru_utime.tv_usec;
652 zmq_msg_init_size(&messages[0],
sizeof utime);
653 memcpy(zmq_msg_data(&messages[0]), &utime,
sizeof utime);
657 uint64_t stime = r.ru_stime.tv_sec*USEC_PER_SEC+r.ru_stime.tv_usec;
658 zmq_msg_init_size(&messages[1],
sizeof stime);
659 memcpy(zmq_msg_data(&messages[1]), &stime,
sizeof stime);
677 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0);
678 telemetryTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, queue);
679 dispatch_source_set_timer(telemetryTimer, dispatch_walltime(NULL, 0), NSEC_PER_SEC/1000, NSEC_PER_SEC/1000);
681 dispatch_source_set_event_handler(telemetryTimer, ^{
685 dispatch_source_set_cancel_handler(telemetryTimer, ^{
686 dispatch_semaphore_signal(telemetryCanceled);
689 dispatch_resume(telemetryTimer);
700 dispatch_source_cancel(telemetryTimer);
701 dispatch_semaphore_wait(telemetryCanceled, DISPATCH_TIME_FOREVER);
702 dispatch_sync(telemetryQueue, ^{
708 zmq_close(zmqTelemetry);
712 dispatch_release(telemetryTimer);
713 telemetryTimer = NULL;
724 controlTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, controlQueue);
725 dispatch_source_set_timer(controlTimer, dispatch_walltime(NULL,0), NSEC_PER_SEC/1000, NSEC_PER_SEC/1000);
727 dispatch_source_set_event_handler(controlTimer, ^{
731 zmq_pollitem_t items[]=
733 {zmqControl,0,ZMQ_POLLIN,0},
734 {zmqSelfReceive,0,ZMQ_POLLIN,0},
738 zmq_poll(items,itemCount,timeout);
739 if(!(items[0].revents & ZMQ_POLLIN))
749 dispatch_source_set_timer(telemetryTimer, dispatch_walltime(NULL, 0), NSEC_PER_SEC/2, NSEC_PER_SEC/100);
782 bool shouldUseInterprocessSerialization =
vuoReceiveBool(zmqControl, NULL);
787 char *valueAsString = persistentState->
nodeRegistry->
getPortValue(&compositionState, portIdentifier, shouldUseInterprocessSerialization);
789 zmq_msg_t messages[1];
795 free(compositionIdentifier);
796 free(portIdentifier);
809 zmq_msg_t messages[1];
815 free(compositionIdentifier);
816 free(portIdentifier);
830 free(compositionIdentifier);
831 free(portIdentifier);
845 free(compositionIdentifier);
846 free(portIdentifier);
852 int count = getPublishedInputPortCount();
853 char **names = getPublishedInputPortNames();
855 zmq_msg_t messages[count];
856 for (
int i = 0; i < count; ++i)
864 int count = getPublishedOutputPortCount();
865 char **names = getPublishedOutputPortNames();
867 zmq_msg_t messages[count];
868 for (
int i = 0; i < count; ++i)
876 int count = getPublishedInputPortCount();
877 char **names = getPublishedInputPortTypes();
879 zmq_msg_t messages[count];
880 for (
int i = 0; i < count; ++i)
888 int count = getPublishedOutputPortCount();
889 char **names = getPublishedOutputPortTypes();
891 zmq_msg_t messages[count];
892 for (
int i = 0; i < count; ++i)
900 int count = getPublishedInputPortCount();
901 char **types = getPublishedInputPortTypes();
902 char **names = getPublishedInputPortDetails();
904 zmq_msg_t messages[count];
905 for (
int i = 0; i < count; ++i)
907 char *newDetails = mergeEnumDetails(types[i], names[i]);
909 names[i] = newDetails;
920 int count = getPublishedOutputPortCount();
921 char **names = getPublishedOutputPortDetails();
923 zmq_msg_t messages[count];
924 for (
int i = 0; i < count; ++i)
934 char **names = (
char **)malloc(count *
sizeof(
char *));
935 for (
int i = 0; i < count; ++i)
938 firePublishedInputPortEvent(names, count);
940 for (
int i = 0; i < count; ++i)
956 setPublishedInputPortValue(portIdentifier, valueAsString);
958 free(portIdentifier);
971 bool shouldUseInterprocessSerialization =
vuoReceiveBool(zmqControl, NULL);
973 char *valueAsString = getPublishedInputPortValue(portIdentifier, shouldUseInterprocessSerialization);
974 free(portIdentifier);
975 zmq_msg_t messages[1];
987 bool shouldUseInterprocessSerialization =
vuoReceiveBool(zmqControl, NULL);
989 char *valueAsString = getPublishedOutputPortValue(portIdentifier, shouldUseInterprocessSerialization);
990 free(portIdentifier);
991 zmq_msg_t messages[1];
1010 zmq_msg_t messages[1];
1016 free(compositionIdentifier);
1017 free(portIdentifier);
1034 free(compositionIdentifier);
1035 free(portIdentifier);
1047 free(compositionIdentifier);
1059 free(compositionIdentifier);
1071 free(compositionIdentifier);
1083 free(compositionIdentifier);
1089 dispatch_source_set_cancel_handler(controlTimer, ^{
1092 dispatch_semaphore_signal(controlCanceled);
1096 dispatch_resume(controlTimer);
1107 dispatch_source_cancel(controlTimer);
1120 zmq_msg_init_size(&message,
sizeof z);
1121 memcpy(zmq_msg_data(&message), &z,
sizeof z);
1122 if (zmq_send(zmqSelfSend, &message, 0) != 0)
1123 VUserLog(
"Couldn't break: %s (%d)", zmq_strerror(errno), errno);
1124 zmq_msg_close(&message);
1135 dispatch_semaphore_wait(controlCanceled, DISPATCH_TIME_FOREVER);
1136 dispatch_sync(controlQueue, ^{
1137 zmq_close(zmqControl);
1141 dispatch_release(controlTimer);
1142 controlTimer = NULL;
1146 zmq_close(zmqSelfSend);
1152 zmq_close(zmqSelfReceive);
1153 zmqSelfReceive = NULL;
1166 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
1167 dispatch_async(queue, ^{
1174 usleep(USEC_PER_SEC / 100);
1175 }
while (ret == -1 && readError == EAGAIN);
1177 _hasZmqConnection =
false;