18 #include <CoreServices/CoreServices.h>
24 #include <mach-o/loader.h>
25 #include <sys/proc_info.h>
29 static const char *
mainThreadChecker =
"/Applications/Xcode.app/Contents/Developer/usr/lib/libMainThreadChecker.dylib";
39 int flags = fcntl(fd, F_GETFD);
42 VUserLog(
"Error: Couldn't get flags for desciptor %d: %s", fd, strerror(errno));
48 if (fcntl(fd, F_SETFD, flags) != 0)
49 VUserLog(
"Error: Couldn't set FD_CLOEXEC on descriptor %d: %s", fd, strerror(errno));
55 static void __attribute__((constructor)) VuoRunner_init()
59 #pragma clang diagnostic push
60 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
64 #pragma clang diagnostic pop
90 if (timeoutInSeconds >= 0)
92 int timeoutInMilliseconds = timeoutInSeconds * 1000;
93 zmq_setsockopt(zmqSocket, ZMQ_RCVTIMEO, &timeoutInMilliseconds,
sizeof timeoutInMilliseconds);
94 zmq_setsockopt(zmqSocket, ZMQ_SNDTIMEO, &timeoutInMilliseconds,
sizeof timeoutInMilliseconds);
98 zmq_setsockopt(zmqSocket, ZMQ_LINGER, &linger,
sizeof linger);
114 typedef void *(*vuoImageMakeFromJsonWithDimensionsType)(
json_object *,
unsigned int,
unsigned int);
137 vr->executablePath = executablePath;
139 vr->shouldDeleteBinariesWhenFinished = deleteExecutableWhenFinished;
140 vr->sourceDir = sourceDir;
160 const std::shared_ptr<VuoRunningCompositionLibraries> &runningCompositionLibraries,
164 vr->executablePath = compositionLoaderPath;
165 vr->dylibPath = compositionDylibPath;
166 vr->dependencyLibraries = runningCompositionLibraries;
167 vr->sourceDir = sourceDir;
169 vr->shouldDeleteBinariesWhenFinished = deleteDylibsWhenFinished;
170 runningCompositionLibraries->setDeleteResourceLibraries(deleteDylibsWhenFinished);
184 bool deleteDylibWhenFinished)
187 vr->dylibPath = dylibPath;
188 vr->shouldDeleteBinariesWhenFinished = deleteDylibWhenFinished;
189 vr->sourceDir = sourceDir;
200 dispatch_release(stoppedSemaphore);
201 dispatch_release(terminatedZMQContextSemaphore);
202 dispatch_release(beganListeningSemaphore);
203 dispatch_release(endedListeningSemaphore);
204 dispatch_release(lastFiredEventSemaphore);
205 dispatch_release(delegateQueue);
217 VUserLog(
"Error: Only call VuoRunner::setRuntimeChecking() prior to starting the composition.");
227 VuoRunner::VuoRunner(
void)
231 dependencyLibraries = NULL;
232 shouldContinueIfRunnerDies =
false;
233 shouldDeleteBinariesWhenFinished =
false;
234 isRuntimeCheckingEnabled =
false;
238 listenCanceled =
false;
239 stoppedSemaphore = dispatch_semaphore_create(1);
240 terminatedZMQContextSemaphore = dispatch_semaphore_create(0);
241 beganListeningSemaphore = dispatch_semaphore_create(0);
242 endedListeningSemaphore = dispatch_semaphore_create(1);
243 lastFiredEventSemaphore = dispatch_semaphore_create(0);
244 lastFiredEventSignaled =
false;
245 controlQueue = dispatch_queue_create(
"org.vuo.runner.control", NULL);
248 ZMQSelfReceive = NULL;
251 ZMQLoaderControl = NULL;
253 delegateQueue = dispatch_queue_create(
"org.vuo.runner.delegate", NULL);
254 arePublishedInputPortsCached =
false;
255 arePublishedOutputPortsCached =
false;
279 if (isInCurrentProcess())
281 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
282 dispatch_async(queue, ^{
288 usleep(USEC_PER_SEC / 1000);
298 stopBecauseLostContact(e.
what());
328 stopBecauseLostContact(e.
what());
336 void VuoRunner::copyDylibAndChangeId(
string dylibPath,
string &outputDylibPath)
338 string directory, file, extension;
341 const int makeTmpFileExtension = 7;
342 if (file.length() > makeTmpFileExtension)
345 string trimmedFile = file.substr(0, file.length() - makeTmpFileExtension);
351 alreadyLoaded = dlopen(outputDylibPath.c_str(), RTLD_NOLOAD);
352 }
while (alreadyLoaded);
362 outputDylibPath =
"/tmp/" + hash +
".dylib";
363 alreadyLoaded = dlopen(outputDylibPath.c_str(), RTLD_NOLOAD);
364 }
while (alreadyLoaded);
367 string newDirectory, newFile, newExtension;
370 if (newFile.length() > file.length())
371 throw VuoException(
"The composition couldn't start because the uniqued dylib name (" + newFile +
") is longer than the original dylib name (" + file +
").");
373 if (copyfile(dylibPath.c_str(), outputDylibPath.c_str(), NULL, COPYFILE_ALL))
374 throw VuoException(
"The composition couldn't start because a copy of the dylib couldn't be made.");
376 FILE *fp = fopen(outputDylibPath.c_str(),
"r+b");
378 throw VuoException(
"The composition couldn't start because the dylib's header couldn't be opened.");
381 struct mach_header_64 header;
382 if (fread(&header,
sizeof(header), 1, fp) != 1)
383 throw VuoException(
"The composition couldn't start because the dylib's header couldn't be read.");
385 if (header.magic != MH_MAGIC_64
386 || header.cputype != CPU_TYPE_X86_64)
387 throw VuoException(
"The composition couldn't start because the dylib isn't an x86_64-only (non-fat) Mach-O binary.");
389 for (
int i = 0; i < header.ncmds; ++i)
391 struct load_command lc;
392 if (fread(&lc,
sizeof(lc), 1, fp) != 1)
393 throw VuoException(
"The composition couldn't start because the dylib's command couldn't be read.");
396 if (lc.cmd == LC_ID_DYLIB)
398 fseek(fp,
sizeof(
struct dylib), SEEK_CUR);
400 size_t nameLength = lc.cmdsize -
sizeof(
struct dylib_command);
401 char *name = (
char *)calloc(nameLength + 1, 1);
402 if (fread(name, nameLength, 1, fp) != 1)
403 throw VuoException(
"The composition couldn't start because the dylib's ID command couldn't be read.");
406 fseek(fp, -nameLength, SEEK_CUR);
407 bzero(name, nameLength);
408 memcpy(name, outputDylibPath.c_str(), min(nameLength, outputDylibPath.length()));
409 fwrite(name, nameLength, 1, fp);
413 fseek(fp, lc.cmdsize-
sizeof(lc), SEEK_CUR);
416 throw VuoException(
"The composition couldn't start because the dylib's LC_ID_DYLIB command couldn't be found.");
428 void VuoRunner::startInternal(
void)
431 dispatch_semaphore_wait(stoppedSemaphore, DISPATCH_TIME_FOREVER);
433 ZMQContext = zmq_init(1);
435 if (isInCurrentProcess())
439 bool alreadyLoaded = dlopen(dylibPath.c_str(), RTLD_NOLOAD);
445 string uniquedDylibPath;
446 copyDylibAndChangeId(dylibPath, uniquedDylibPath);
447 VDebugLog(
"\"%s\" is already loaded, so I duplicated it and changed its LC_ID_DYLIB to \"%s\".", dylibPath.c_str(), uniquedDylibPath.c_str());
449 if (shouldDeleteBinariesWhenFinished)
450 remove(dylibPath.c_str());
452 dylibPath = uniquedDylibPath;
453 shouldDeleteBinariesWhenFinished =
true;
458 throw VuoException(
"The composition couldn't start because the library '" + dylibPath +
"' couldn't be loaded : " + dlerror());
464 throw VuoException(
"The composition couldn't start because vuoInitInProcess() couldn't be found in '" + dylibPath +
"' : " + dlerror());
469 vuoInitInProcess(ZMQContext, ZMQControlURL.c_str(), ZMQTelemetryURL.c_str(),
true, getpid(), -1,
false,
486 string executableName;
487 if (isUsingCompositionLoader())
491 string dir, file, ext;
493 executableName = file;
497 string dir, file, ext;
499 string executableName = file;
501 executableName +=
"." + ext;
503 args.push_back(executableName);
515 args.push_back(
"--vuo-control=" + ZMQControlURL);
516 args.push_back(
"--vuo-telemetry=" + ZMQTelemetryURL);
521 args.push_back(
"--vuo-runner-pid=" + oss.str());
527 args.push_back(
"--vuo-runner-pipe=" + oss.str());
530 if (shouldContinueIfRunnerDies)
531 args.push_back(
"--vuo-continue-if-runner-dies");
533 if (isUsingCompositionLoader())
536 args.push_back(
"--vuo-loader=" + ZMQLoaderControlURL);
539 args.push_back(
"--vuo-pause");
544 throw VuoException(
"The composition couldn't start because a pipe couldn't be opened : " +
string(strerror(errno)));
547 int argSize = args.size();
548 for (
size_t i = 0; i < argSize; ++i)
550 size_t mallocSize = args[i].length() + 1;
551 argv[i] = (
char *)malloc(mallocSize);
552 strlcpy(argv[i], args[i].c_str(), mallocSize);
554 argv[argSize] = NULL;
556 string errorWorkingDirectory =
"The composition couldn't start because the working directory couldn't be changed to '" + sourceDir +
"' : ";
557 string errorExecutable =
"The composition couldn't start because the file '" + executablePath +
"' couldn't be executed : ";
558 string errorFork =
"The composition couldn't start because the composition process couldn't be forked : ";
559 const size_t ERROR_BUFFER_LEN = 256;
560 char errorBuffer[ERROR_BUFFER_LEN];
562 pipe(runnerReadCompositionWritePipe);
564 pid_t childPid = fork();
571 close(runnerReadCompositionWritePipe[0]);
573 pid_t grandchildPid = fork();
574 if (grandchildPid == 0)
581 if (!sourceDir.empty())
583 ret = chdir(sourceDir.c_str());
586 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
587 write(STDERR_FILENO, errorWorkingDirectory.c_str(), errorWorkingDirectory.length());
588 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
589 write(STDERR_FILENO,
"\n", 1);
594 if (isRuntimeCheckingEnabled)
597 ret = execv(executablePath.c_str(), argv);
600 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
601 write(STDERR_FILENO, errorExecutable.c_str(), errorExecutable.length());
602 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
603 write(STDERR_FILENO,
"\n", 1);
604 for (
size_t i = 0; i < argSize; ++i)
609 else if (grandchildPid > 0)
613 write(fd[1], &grandchildPid,
sizeof(pid_t));
623 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
624 write(STDERR_FILENO, errorFork.c_str(), errorFork.length());
625 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
626 write(STDERR_FILENO,
"\n", 1);
630 else if (childPid > 0)
639 for (
size_t i = 0; i < argSize; ++i)
643 read(fd[0], &grandchildPid,
sizeof(pid_t));
650 ret = waitpid(childPid, &status, 0);
651 }
while (ret == -1 && errno == EINTR);
652 if (WIFEXITED(status) && WEXITSTATUS(status))
653 throw VuoException(
"The composition couldn't start because the parent of the composition process exited with an error.");
654 else if (WIFSIGNALED(status))
655 throw VuoException(
"The composition couldn't start because the parent of the composition process exited abnormally : " +
string(strsignal(WTERMSIG(status))));
657 if (grandchildPid > 0)
658 compositionPid = grandchildPid;
660 throw VuoException(
"The composition couldn't start because the composition process id couldn't be obtained");
664 for (
size_t i = 0; i < argSize; ++i)
667 throw VuoException(
"The composition couldn't start because the parent of the composition process couldn't be forked : " +
string(strerror(errno)));
672 if (isUsingCompositionLoader())
681 if (++numTries == 1000)
682 throw VuoException(
"The composition couldn't start because the runner couldn't establish communication with the composition loader : " +
string(strerror(errno)));
683 usleep(USEC_PER_SEC / 1000);
690 __block
string errorMessage;
691 dispatch_sync(controlQueue, ^{
695 errorMessage = e.
what();
698 if (! errorMessage.empty())
709 pthread_detach(pthread_self());
720 void VuoRunner::setUpConnections(
void)
727 while (zmq_connect(
ZMQControl,ZMQControlURL.c_str()))
729 if (++numTries == 1000)
730 throw VuoException(
"The composition couldn't start because the runner couldn't establish communication to control the composition : " +
string(strerror(errno)));
731 usleep(USEC_PER_SEC / 1000);
735 arePublishedInputPortsCached =
false;
736 arePublishedOutputPortsCached =
false;
737 if (isInCurrentProcess())
739 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
740 __block
string publishedPortsError;
741 dispatch_async(queue, ^{
743 getCachedPublishedPorts(
false);
744 getCachedPublishedPorts(
true);
746 publishedPortsError = e.
what();
749 while (! (arePublishedInputPortsCached && arePublishedOutputPortsCached) )
752 usleep(USEC_PER_SEC / 1000);
754 if (! publishedPortsError.empty())
760 getCachedPublishedPorts(
false);
761 getCachedPublishedPorts(
true);
764 listenCanceled =
false;
765 dispatch_semaphore_wait(endedListeningSemaphore, DISPATCH_TIME_FOREVER);
767 pthread_t listenThread;
770 throw VuoException(
string(
"The composition couldn't start because the runner couldn't create a thread: ") + strerror(ret));
772 dispatch_semaphore_wait(beganListeningSemaphore, DISPATCH_TIME_FOREVER);
773 if (!listenError.empty())
774 throw VuoException(
"The composition couldn't start because the runner couldn't establish communication to listen to the composition: " + listenError);
796 if (! isInCurrentProcess())
797 throw VuoException(
"The composition is not running in the current process. Only use this function if the composition was constructed with newCurrentProcessRunnerFromDynamicLibrary().");
800 throw VuoException(
"This is not the main thread. Only call this function from the main thread.");
832 if (! isInCurrentProcess())
833 throw VuoException(
"The composition is not running in the current process. Only use this function if the composition was constructed with newCurrentProcessRunnerFromDynamicLibrary().");
836 throw VuoException(
"This is not the main thread. Only call this function from the main thread.");
850 dispatch_sync(controlQueue, ^{
851 if (stopped || lostContact) {
864 stopBecauseLostContact(e.
what());
878 dispatch_sync(controlQueue, ^{
879 if (stopped || lostContact) {
892 stopBecauseLostContact(e.
what());
920 if (! isUsingCompositionLoader())
921 throw VuoException(
"The runner is not using a composition loader. Only use this function if the composition was constructed with newSeparateProcessRunnerFromDynamicLibrary().");
923 dispatch_sync(controlQueue, ^{
924 if (stopped || lostContact) {
930 if (dylibPath != compositionDylibPath)
932 if (shouldDeleteBinariesWhenFinished)
934 remove(dylibPath.c_str());
937 dylibPath = compositionDylibPath;
951 cleanUpConnections();
953 vector<string> dependencyDylibPathsRemoved = dependencyLibraries->dequeueLibrariesToUnload();
954 vector<string> dependencyDylibPathsAdded = dependencyLibraries->dequeueLibrariesToLoad();
956 unsigned int messageCount = 4 + dependencyDylibPathsAdded.size() + dependencyDylibPathsRemoved.size();
957 zmq_msg_t *messages = (zmq_msg_t *)malloc(messageCount *
sizeof(zmq_msg_t));
963 for (vector<string>::iterator i = dependencyDylibPathsAdded.begin(); i != dependencyDylibPathsAdded.end(); ++i) {
968 for (vector<string>::iterator i = dependencyDylibPathsRemoved.begin(); i != dependencyDylibPathsRemoved.end(); ++i) {
993 stopBecauseLostContact(e.
what());
1013 dispatch_sync(controlQueue, ^{
1025 int timeoutInSeconds = (isInCurrentProcess() ? -1 : 5);
1026 zmq_msg_t messages[3];
1035 __block
bool replyReceived =
false;
1036 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
1046 replyReceived =
true;
1048 while (!replyReceived)
1051 usleep(USEC_PER_SEC / 1000);
1064 cleanUpConnections();
1080 VUserLog(
"The composition couldn't stop because vuoFini() couldn't be found in '%s' : %s", dylibPath.c_str(), dlerror());
1088 VUserLog(
"The composition couldn't stop because vuoFiniRuntimeState() couldn't be found in '%s' : %s", dylibPath.c_str(), dlerror());
1104 close(runnerReadCompositionWritePipe[1]);
1112 read(runnerReadCompositionWritePipe[0], &buf, 1);
1115 close(runnerReadCompositionWritePipe[0]);
1119 zmq_term(ZMQContext);
1124 dispatch_semaphore_wait(terminatedZMQContextSemaphore, DISPATCH_TIME_FOREVER);
1128 if (shouldDeleteBinariesWhenFinished)
1130 if (isUsingCompositionLoader())
1132 remove(dylibPath.c_str());
1134 else if (isInCurrentProcess())
1136 remove(dylibPath.c_str());
1140 remove(executablePath.c_str());
1144 dependencyLibraries =
nullptr;
1147 dispatch_semaphore_signal(stoppedSemaphore);
1155 void VuoRunner::cleanUpConnections(
void)
1165 vuoSend(
"VuoRunner::ZMQSelfSend", ZMQSelfSend, 0,
nullptr, 0,
false,
nullptr);
1167 dispatch_semaphore_wait(endedListeningSemaphore, DISPATCH_TIME_FOREVER);
1168 dispatch_semaphore_signal(endedListeningSemaphore);
1178 dispatch_retain(stoppedSemaphore);
1179 dispatch_semaphore_wait(stoppedSemaphore, DISPATCH_TIME_FOREVER);
1180 dispatch_semaphore_signal(stoppedSemaphore);
1181 dispatch_release(stoppedSemaphore);
1201 const char *valueAsString = json_object_to_json_string_ext(value, JSON_C_TO_STRING_PLAIN);
1203 dispatch_sync(controlQueue, ^{
1204 if (stopped || lostContact) {
1212 zmq_msg_t messages[3];
1221 stopBecauseLostContact(e.
what());
1241 dispatch_sync(controlQueue, ^{
1242 if (stopped || lostContact) {
1250 zmq_msg_t messages[2];
1258 stopBecauseLostContact(e.
what());
1278 __block
string valueAsString;
1279 dispatch_sync(controlQueue, ^{
1280 if (stopped || lostContact) {
1288 zmq_msg_t messages[3];
1294 valueAsString = receiveString(
"null");
1298 stopBecauseLostContact(e.
what());
1301 return json_tokener_parse(valueAsString.c_str());
1319 __block
string valueAsString;
1320 dispatch_sync(controlQueue, ^{
1321 if (stopped || lostContact) {
1329 zmq_msg_t messages[3];
1335 valueAsString = receiveString(
"null");
1339 stopBecauseLostContact(e.
what());
1342 return json_tokener_parse(valueAsString.c_str());
1360 __block
string summary;
1361 dispatch_sync(controlQueue, ^{
1362 if (stopped || lostContact) {
1370 zmq_msg_t messages[2];
1375 summary = receiveString(
"");
1379 stopBecauseLostContact(e.
what());
1400 __block
string summary;
1401 dispatch_sync(controlQueue, ^{
1402 if (stopped || lostContact) {
1410 zmq_msg_t messages[2];
1415 summary = receiveString(
"");
1419 stopBecauseLostContact(e.
what());
1440 __block
string summary;
1441 dispatch_sync(controlQueue, ^{
1442 if (stopped || lostContact) {
1450 zmq_msg_t messages[2];
1455 summary = receiveString(
"");
1459 stopBecauseLostContact(e.
what());
1480 __block
string summary;
1481 dispatch_sync(controlQueue, ^{
1482 if (stopped || lostContact) {
1490 zmq_msg_t messages[2];
1495 summary = receiveString(
"");
1499 stopBecauseLostContact(e.
what());
1517 dispatch_sync(controlQueue, ^{
1518 if (stopped || lostContact) {
1526 zmq_msg_t messages[2];
1534 stopBecauseLostContact(e.
what());
1551 dispatch_sync(controlQueue, ^{
1552 if (stopped || lostContact) {
1560 zmq_msg_t messages[2];
1568 stopBecauseLostContact(e.
what());
1585 dispatch_sync(controlQueue, ^{
1586 if (stopped || lostContact) {
1594 zmq_msg_t messages[1];
1601 stopBecauseLostContact(e.
what());
1620 dispatch_sync(controlQueue, ^{
1621 if (stopped || lostContact) {
1629 zmq_msg_t messages[1];
1636 stopBecauseLostContact(e.
what());
1653 dispatch_sync(controlQueue, ^{
1654 if (stopped || lostContact) {
1662 zmq_msg_t messages[1];
1669 stopBecauseLostContact(e.
what());
1688 dispatch_sync(controlQueue, ^{
1689 if (stopped || lostContact) {
1697 zmq_msg_t messages[1];
1704 stopBecauseLostContact(e.
what());
1725 for (
auto i : portsAndValuesToSet)
1727 string portName = i.first->getName();
1728 if (portName ==
"width")
1729 p->
lastWidth = json_object_get_int64(i.second);
1730 else if (portName ==
"height")
1731 p->
lastHeight = json_object_get_int64(i.second);
1732 else if (portName ==
"image" || portName ==
"startImage")
1735 if (json_object_object_get_ex(i.second,
"pixelsWide", &o))
1736 p->
lastWidth = json_object_get_int64(o);
1737 if (json_object_object_get_ex(i.second,
"pixelsHigh", &o))
1742 dispatch_sync(controlQueue, ^{
1743 if (stopped || lostContact) {
1751 int messageCount = portsAndValuesToSet.size() * 2;
1752 zmq_msg_t messages[messageCount];
1755 for (
auto &kv : portsAndValuesToSet)
1766 stopBecauseLostContact(e.
what());
1780 set<VuoRunner::Port *> portAsSet;
1781 portAsSet.insert(port);
1797 dispatch_sync(controlQueue, ^{
1798 if (stopped || lostContact) {
1804 lastFiredEventSignaled =
false;
1808 size_t messageCount = ports.size() + 1;
1809 zmq_msg_t messages[messageCount];
1822 stopBecauseLostContact(e.
what());
1853 saturating_semaphore_wait(lastFiredEventSemaphore, &lastFiredEventSignaled);
1868 __block
string valueAsString;
1869 dispatch_sync(controlQueue, ^{
1870 if (stopped || lostContact) {
1878 zmq_msg_t messages[2];
1883 valueAsString = receiveString(
"null");
1887 stopBecauseLostContact(e.
what());
1890 return json_tokener_parse(valueAsString.c_str());
1905 __block
string valueAsString;
1906 dispatch_sync(controlQueue, ^{
1907 if (stopped || lostContact) {
1915 zmq_msg_t messages[2];
1920 valueAsString = receiveString(
"null");
1924 stopBecauseLostContact(e.
what());
1929 json_object *js = json_tokener_parse(valueAsString.c_str());
1933 uint64_t actualWidth = 0;
1934 if (json_object_object_get_ex(js,
"pixelsWide", &o))
1935 actualWidth = json_object_get_int64(o);
1936 uint64_t actualHeight = 0;
1937 if (json_object_object_get_ex(js,
"pixelsHigh", &o))
1938 actualHeight = json_object_get_int64(o);
1944 p->vuoImageMakeFromJsonWithDimensions = (Private::vuoImageMakeFromJsonWithDimensionsType)dlsym(RTLD_DEFAULT,
"VuoImage_makeFromJsonWithDimensions");
1945 if (!p->vuoImageMakeFromJsonWithDimensions)
1947 VUserLog(
"Error: Couldn't find VuoImage_makeFromJsonWithDimensions.");
1954 VUserLog(
"Error: Couldn't find VuoImage_getJson.");
1984 vector<VuoRunner::Port *> VuoRunner::getCachedPublishedPorts(
bool input)
1992 if (! arePublishedInputPortsCached)
1994 publishedInputPorts = refreshPublishedPorts(
true);
1995 arePublishedInputPortsCached =
true;
1997 return publishedInputPorts;
2001 if (! arePublishedOutputPortsCached)
2003 publishedOutputPorts = refreshPublishedPorts(
false);
2004 arePublishedOutputPortsCached =
true;
2006 return publishedOutputPorts;
2021 vector<VuoRunner::Port *> VuoRunner::refreshPublishedPorts(
bool input)
2023 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
2024 dispatch_source_t timeout = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, queue);
2025 dispatch_source_set_timer(timeout, dispatch_time(DISPATCH_TIME_NOW, 5*NSEC_PER_SEC), NSEC_PER_SEC, NSEC_PER_SEC/10);
2026 dispatch_source_set_event_handler(timeout, ^{
2027 stopBecauseLostContact(
"The connection between the composition and runner timed out when trying to receive the list of published ports");
2028 dispatch_source_cancel(timeout);
2030 dispatch_resume(timeout);
2032 vector<VuoRunner::Port *> ports;
2059 vector<string> names;
2060 vector<string> types;
2061 vector<string> details;
2063 for (
int i = 0; i < 3; ++i)
2067 vector<string> messageStrings = receiveListOfStrings();
2069 names = messageStrings;
2071 types = messageStrings;
2073 details = messageStrings;
2076 for (
size_t i = 0; i < names.size() && i < types.size() && i < details.size(); ++i)
2078 VuoRunner::Port *port =
new Port(names[i], types[i], json_tokener_parse(details[i].c_str()));
2079 ports.push_back(port);
2084 dispatch_source_cancel(timeout);
2085 dispatch_release(timeout);
2089 dispatch_source_cancel(timeout);
2090 dispatch_release(timeout);
2106 return getCachedPublishedPorts(
true);
2120 return getCachedPublishedPorts(
false);
2135 for (vector<VuoRunner::Port *>::iterator i = inputPorts.begin(); i != inputPorts.end(); ++i)
2136 if ((*i)->getName() == name)
2154 for (vector<VuoRunner::Port *>::iterator i = outputPorts.begin(); i != outputPorts.end(); ++i)
2155 if ((*i)->getName() == name)
2172 void VuoRunner::listen()
2176 const char *compositionName = dylibPath.empty() ? executablePath.c_str() : dylibPath.c_str();
2179 if (
const char *lastSlash = strrchr(compositionName,
'/'))
2180 compositionName = lastSlash + 1;
2182 char threadName[MAXTHREADNAMESIZE];
2183 snprintf(threadName, MAXTHREADNAMESIZE,
"org.vuo.runner.telemetry: %s", compositionName);
2184 pthread_setname_np(threadName);
2187 ZMQSelfReceive = zmq_socket(ZMQContext, ZMQ_PAIR);
2189 if (zmq_bind(ZMQSelfReceive,
"inproc://vuo-runner-self") != 0)
2191 listenError = strerror(errno);
2192 dispatch_semaphore_signal(beganListeningSemaphore);
2196 ZMQSelfSend = zmq_socket(ZMQContext, ZMQ_PAIR);
2198 if (zmq_connect(ZMQSelfSend,
"inproc://vuo-runner-self") != 0)
2200 listenError = strerror(errno);
2201 dispatch_semaphore_signal(beganListeningSemaphore);
2206 ZMQTelemetry = zmq_socket(ZMQContext,ZMQ_SUB);
2208 if(zmq_connect(ZMQTelemetry,ZMQTelemetryURL.c_str()))
2210 listenError = strerror(errno);
2211 dispatch_semaphore_signal(beganListeningSemaphore);
2219 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2221 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2223 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2225 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2227 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2229 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2231 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2233 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2235 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2237 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2250 zmq_pollitem_t items[]=
2252 {ZMQTelemetry,0,ZMQ_POLLIN,0},
2256 zmq_poll(items,itemCount,timeout);
2259 dispatch_semaphore_signal(beganListeningSemaphore);
2261 bool pendingCancel =
false;
2262 while(! listenCanceled)
2264 zmq_pollitem_t items[]=
2266 {ZMQTelemetry,0,ZMQ_POLLIN,0},
2267 {ZMQSelfReceive,0,ZMQ_POLLIN,0},
2272 long timeout = pendingCancel ? USEC_PER_SEC / 10 : USEC_PER_SEC;
2273 zmq_poll(items,itemCount,timeout);
2274 if(items[0].revents & ZMQ_POLLIN)
2286 dispatch_sync(delegateQueue, ^{
2296 dispatch_sync(delegateQueue, ^{
2300 free(compositionIdentifier);
2301 free(nodeIdentifier);
2308 dispatch_sync(delegateQueue, ^{
2312 free(compositionIdentifier);
2313 free(nodeIdentifier);
2332 string portDataSummary;
2336 portDataSummary = s;
2340 portDataSummary =
"";
2342 dispatch_sync(delegateQueue, ^{
2349 free(portIdentifier);
2351 free(compositionIdentifier);
2371 string portDataSummary;
2375 portDataSummary = s;
2379 portDataSummary =
"";
2381 dispatch_sync(delegateQueue, ^{
2388 free(portIdentifier);
2390 free(compositionIdentifier);
2404 string portDataSummary;
2408 portDataSummary = s;
2412 portDataSummary =
"";
2416 dispatch_sync(delegateQueue, ^{
2422 free(portIdentifier);
2428 saturating_semaphore_signal(lastFiredEventSemaphore, &lastFiredEventSignaled);
2435 dispatch_sync(delegateQueue, ^{
2439 free(compositionIdentifier);
2440 free(portIdentifier);
2446 dispatch_sync(delegateQueue, ^{
2455 dispatch_sync(delegateQueue, ^{
2459 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2465 VUserLog(
"Error: Unknown telemetry message type: %d", type);
2469 else if (! listenCanceled)
2471 if (items[1].revents & ZMQ_POLLIN)
2477 pendingCancel =
true;
2480 else if (pendingCancel)
2481 listenCanceled =
true;
2486 listenCanceled =
true;
2487 string dir, file, ext;
2489 stopBecauseLostContact(
"The connection between the composition ('" + file +
"') and runner timed out while listening for telemetry.");
2494 zmq_close(ZMQTelemetry);
2495 ZMQTelemetry = NULL;
2497 zmq_close(ZMQSelfSend);
2499 zmq_close(ZMQSelfReceive);
2500 ZMQSelfReceive = NULL;
2502 dispatch_semaphore_signal(endedListeningSemaphore);
2515 void VuoRunner::vuoControlRequestSend(
enum VuoControlRequest request, zmq_msg_t *messages,
unsigned int messageCount)
2518 vuoSend(
"runner VuoControl",
ZMQControl,request,messages,messageCount,
false,&error);
2537 void VuoRunner::vuoLoaderControlRequestSend(
enum VuoLoaderControlRequest request, zmq_msg_t *messages,
unsigned int messageCount)
2558 void VuoRunner::vuoControlReplyReceive(
enum VuoControlReply expectedReply)
2568 oss << e <<
" (expected " << expectedReply <<
")";
2571 else if (reply != expectedReply)
2574 oss <<
"The runner received the wrong message from the composition (expected " << expectedReply <<
", received " << reply <<
")";
2597 oss << e <<
" (expected " << expectedReply <<
")";
2600 else if (reply != expectedReply)
2603 oss <<
"The runner received the wrong message from the composition loader (expected " << expectedReply <<
", received " << reply <<
")";
2613 string VuoRunner::receiveString(
string fallbackIfNull)
2632 ret = fallbackIfNull;
2640 vector<string> VuoRunner::receiveListOfStrings(
void)
2642 vector<string> messageStrings;
2645 string s = receiveString(
"");
2646 messageStrings.push_back(s);
2648 return messageStrings;
2656 void VuoRunner::saturating_semaphore_signal(dispatch_semaphore_t dsema,
bool *signaled)
2658 if (__sync_bool_compare_and_swap(signaled,
false,
true))
2659 dispatch_semaphore_signal(dsema);
2667 void VuoRunner::saturating_semaphore_wait(dispatch_semaphore_t dsema,
bool *signaled)
2670 dispatch_semaphore_wait(dsema, DISPATCH_TIME_FOREVER);
2684 bool VuoRunner::isInCurrentProcess(
void)
2686 return executablePath.empty();
2693 bool VuoRunner::isUsingCompositionLoader(
void)
2695 return ! executablePath.empty() && ! dylibPath.empty();
2703 dispatch_sync(delegateQueue, ^{
2704 this->delegate = delegate;
2711 void VuoRunner::stopBecauseLostContact(
string errorMessage)
2713 __block
bool alreadyLostContact;
2714 dispatch_sync(delegateQueue, ^{
2715 alreadyLostContact = lostContact;
2719 if (alreadyLostContact)
2722 saturating_semaphore_signal(lastFiredEventSemaphore, &lastFiredEventSignaled);
2724 dispatch_sync(delegateQueue, ^{
2729 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2733 if (! isInCurrentProcess())
2739 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2742 zmq_term(ZMQContext);
2744 dispatch_semaphore_signal(terminatedZMQContextSemaphore);
2748 VUserLog(
"%s", errorMessage.c_str());
2758 return compositionPid;
2772 this->details = details;
2823 VuoRunnerDelegate::~VuoRunnerDelegate() { }