18 #include <CoreServices/CoreServices.h>
24 #include <mach-o/loader.h>
25 #include <sys/proc_info.h>
29 static const char *
mainThreadChecker =
"/Applications/Xcode.app/Contents/Developer/usr/lib/libMainThreadChecker.dylib";
38 int flags = fcntl(fd, F_GETFD);
41 VUserLog(
"Error: Couldn't get flags for desciptor %d: %s", fd, strerror(errno));
47 if (fcntl(fd, F_SETFD, flags) != 0)
48 VUserLog(
"Error: Couldn't set FD_CLOEXEC on descriptor %d: %s", fd, strerror(errno));
58 #pragma clang diagnostic push
59 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
63 #pragma clang diagnostic pop
86 if (timeoutInSeconds >= 0)
88 int timeoutInMilliseconds = timeoutInSeconds * 1000;
89 zmq_setsockopt(zmqSocket, ZMQ_RCVTIMEO, &timeoutInMilliseconds,
sizeof timeoutInMilliseconds);
90 zmq_setsockopt(zmqSocket, ZMQ_SNDTIMEO, &timeoutInMilliseconds,
sizeof timeoutInMilliseconds);
94 zmq_setsockopt(zmqSocket, ZMQ_LINGER, &linger,
sizeof linger);
111 vr->executablePath = executablePath;
113 vr->shouldDeleteBinariesWhenFinished = deleteExecutableWhenFinished;
114 vr->sourceDir = sourceDir;
138 vr->executablePath = compositionLoaderPath;
139 vr->dylibPath = compositionDylibPath;
140 vr->dependencyLibraries = runningCompositionLibraries;
141 vr->sourceDir = sourceDir;
143 vr->shouldDeleteBinariesWhenFinished = deleteDylibsWhenFinished;
158 bool deleteDylibWhenFinished)
161 vr->dylibPath = dylibPath;
162 vr->shouldDeleteBinariesWhenFinished = deleteDylibWhenFinished;
163 vr->sourceDir = sourceDir;
174 dispatch_release(stoppedSemaphore);
175 dispatch_release(terminatedZMQContextSemaphore);
176 dispatch_release(beganListeningSemaphore);
177 dispatch_release(endedListeningSemaphore);
178 dispatch_release(lastFiredEventSemaphore);
179 dispatch_release(delegateQueue);
190 VUserLog(
"Error: Only call VuoRunner::setRuntimeChecking() prior to starting the composition.");
200 VuoRunner::VuoRunner(
void)
203 dependencyLibraries = NULL;
204 shouldContinueIfRunnerDies =
false;
205 shouldDeleteBinariesWhenFinished =
false;
206 isRuntimeCheckingEnabled =
false;
210 listenCanceled =
false;
211 stoppedSemaphore = dispatch_semaphore_create(1);
212 terminatedZMQContextSemaphore = dispatch_semaphore_create(0);
213 beganListeningSemaphore = dispatch_semaphore_create(0);
214 endedListeningSemaphore = dispatch_semaphore_create(1);
215 lastFiredEventSemaphore = dispatch_semaphore_create(0);
216 lastFiredEventSignaled =
false;
217 controlQueue = dispatch_queue_create(
"org.vuo.runner.control", NULL);
220 ZMQSelfReceive = NULL;
223 ZMQLoaderControl = NULL;
225 delegateQueue = dispatch_queue_create(
"org.vuo.runner.delegate", NULL);
226 arePublishedInputPortsCached =
false;
227 arePublishedOutputPortsCached =
false;
251 if (isInCurrentProcess())
253 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
254 dispatch_async(queue, ^{
260 usleep(USEC_PER_SEC / 1000);
270 stopBecauseLostContact(e.
what());
300 stopBecauseLostContact(e.
what());
308 void VuoRunner::copyDylibAndChangeId(
string dylibPath,
string &outputDylibPath)
310 string directory, file, extension;
313 const int makeTmpFileExtension = 7;
314 if (file.length() > makeTmpFileExtension)
317 string trimmedFile = file.substr(0, file.length() - makeTmpFileExtension);
323 alreadyLoaded = dlopen(outputDylibPath.c_str(), RTLD_NOLOAD);
324 }
while (alreadyLoaded);
334 outputDylibPath =
"/tmp/" + hash +
".dylib";
335 alreadyLoaded = dlopen(outputDylibPath.c_str(), RTLD_NOLOAD);
336 }
while (alreadyLoaded);
339 string newDirectory, newFile, newExtension;
342 if (newFile.length() > file.length())
343 throw VuoException(
"The composition couldn't start because the uniqued dylib name (" + newFile +
") is longer than the original dylib name (" + file +
").");
345 if (copyfile(dylibPath.c_str(), outputDylibPath.c_str(), NULL, COPYFILE_ALL))
346 throw VuoException(
"The composition couldn't start because a copy of the dylib couldn't be made.");
348 FILE *fp = fopen(outputDylibPath.c_str(),
"r+b");
350 throw VuoException(
"The composition couldn't start because the dylib's header couldn't be opened.");
353 struct mach_header_64 header;
354 if (fread(&header,
sizeof(header), 1, fp) != 1)
355 throw VuoException(
"The composition couldn't start because the dylib's header couldn't be read.");
357 if (header.magic != MH_MAGIC_64
358 || header.cputype != CPU_TYPE_X86_64)
359 throw VuoException(
"The composition couldn't start because the dylib isn't an x86_64-only (non-fat) Mach-O binary.");
361 for (
int i = 0; i < header.ncmds; ++i)
363 struct load_command lc;
364 if (fread(&lc,
sizeof(lc), 1, fp) != 1)
365 throw VuoException(
"The composition couldn't start because the dylib's command couldn't be read.");
368 if (lc.cmd == LC_ID_DYLIB)
370 fseek(fp,
sizeof(
struct dylib), SEEK_CUR);
372 size_t nameLength = lc.cmdsize -
sizeof(
struct dylib_command);
373 char *name = (
char *)calloc(nameLength + 1, 1);
374 if (fread(name, nameLength, 1, fp) != 1)
375 throw VuoException(
"The composition couldn't start because the dylib's ID command couldn't be read.");
378 fseek(fp, -nameLength, SEEK_CUR);
379 bzero(name, nameLength);
380 memcpy(name, outputDylibPath.c_str(), min(nameLength, outputDylibPath.length()));
381 fwrite(name, nameLength, 1, fp);
385 fseek(fp, lc.cmdsize-
sizeof(lc), SEEK_CUR);
388 throw VuoException(
"The composition couldn't start because the dylib's LC_ID_DYLIB command couldn't be found.");
400 void VuoRunner::startInternal(
void)
403 dispatch_semaphore_wait(stoppedSemaphore, DISPATCH_TIME_FOREVER);
405 ZMQContext = zmq_init(1);
407 if (isInCurrentProcess())
411 bool alreadyLoaded = dlopen(dylibPath.c_str(), RTLD_NOLOAD);
417 string uniquedDylibPath;
418 copyDylibAndChangeId(dylibPath, uniquedDylibPath);
419 VDebugLog(
"\"%s\" is already loaded, so I duplicated it and changed its LC_ID_DYLIB to \"%s\".", dylibPath.c_str(), uniquedDylibPath.c_str());
421 if (shouldDeleteBinariesWhenFinished)
422 remove(dylibPath.c_str());
424 dylibPath = uniquedDylibPath;
425 shouldDeleteBinariesWhenFinished =
true;
430 throw VuoException(
"The composition couldn't start because the library '" + dylibPath +
"' couldn't be loaded : " + dlerror());
436 throw VuoException(
"The composition couldn't start because vuoInitInProcess() couldn't be found in '" + dylibPath +
"' : " + dlerror());
441 vuoInitInProcess(ZMQContext, ZMQControlURL.c_str(), ZMQTelemetryURL.c_str(),
true, getpid(), -1,
false,
458 string executableName;
459 if (isUsingCompositionLoader())
463 string dir, file, ext;
465 executableName = file;
469 string dir, file, ext;
471 string executableName = file;
473 executableName +=
"." + ext;
475 args.push_back(executableName);
487 args.push_back(
"--vuo-control=" + ZMQControlURL);
488 args.push_back(
"--vuo-telemetry=" + ZMQTelemetryURL);
493 args.push_back(
"--vuo-runner-pid=" + oss.str());
499 args.push_back(
"--vuo-runner-pipe=" + oss.str());
502 if (shouldContinueIfRunnerDies)
503 args.push_back(
"--vuo-continue-if-runner-dies");
505 if (isUsingCompositionLoader())
508 args.push_back(
"--vuo-loader=" + ZMQLoaderControlURL);
511 args.push_back(
"--vuo-pause");
516 throw VuoException(
"The composition couldn't start because a pipe couldn't be opened : " +
string(strerror(errno)));
519 int argSize = args.size();
520 for (
size_t i = 0; i < argSize; ++i)
522 size_t mallocSize = args[i].length() + 1;
523 argv[i] = (
char *)malloc(mallocSize);
524 strlcpy(argv[i], args[i].c_str(), mallocSize);
526 argv[argSize] = NULL;
528 string errorWorkingDirectory =
"The composition couldn't start because the working directory couldn't be changed to '" + sourceDir +
"' : ";
529 string errorExecutable =
"The composition couldn't start because the file '" + executablePath +
"' couldn't be executed : ";
530 string errorFork =
"The composition couldn't start because the composition process couldn't be forked : ";
531 const size_t ERROR_BUFFER_LEN = 256;
532 char errorBuffer[ERROR_BUFFER_LEN];
534 pipe(runnerReadCompositionWritePipe);
536 pid_t childPid = fork();
543 close(runnerReadCompositionWritePipe[0]);
545 pid_t grandchildPid = fork();
546 if (grandchildPid == 0)
553 if (!sourceDir.empty())
555 ret = chdir(sourceDir.c_str());
558 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
559 write(STDERR_FILENO, errorWorkingDirectory.c_str(), errorWorkingDirectory.length());
560 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
561 write(STDERR_FILENO,
"\n", 1);
566 if (isRuntimeCheckingEnabled)
569 ret = execv(executablePath.c_str(), argv);
572 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
573 write(STDERR_FILENO, errorExecutable.c_str(), errorExecutable.length());
574 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
575 write(STDERR_FILENO,
"\n", 1);
576 for (
size_t i = 0; i < argSize; ++i)
581 else if (grandchildPid > 0)
585 write(fd[1], &grandchildPid,
sizeof(pid_t));
595 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
596 write(STDERR_FILENO, errorFork.c_str(), errorFork.length());
597 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
598 write(STDERR_FILENO,
"\n", 1);
602 else if (childPid > 0)
611 for (
size_t i = 0; i < argSize; ++i)
615 read(fd[0], &grandchildPid,
sizeof(pid_t));
622 ret = waitpid(childPid, &status, 0);
623 }
while (ret == -1 && errno == EINTR);
624 if (WIFEXITED(status) && WEXITSTATUS(status))
625 throw VuoException(
"The composition couldn't start because the parent of the composition process exited with an error.");
626 else if (WIFSIGNALED(status))
627 throw VuoException(
"The composition couldn't start because the parent of the composition process exited abnormally : " +
string(strsignal(WTERMSIG(status))));
629 if (grandchildPid > 0)
630 compositionPid = grandchildPid;
632 throw VuoException(
"The composition couldn't start because the composition process id couldn't be obtained");
636 for (
size_t i = 0; i < argSize; ++i)
639 throw VuoException(
"The composition couldn't start because the parent of the composition process couldn't be forked : " +
string(strerror(errno)));
644 if (isUsingCompositionLoader())
653 if (++numTries == 1000)
654 throw VuoException(
"The composition couldn't start because the runner couldn't establish communication with the composition loader : " +
string(strerror(errno)));
655 usleep(USEC_PER_SEC / 1000);
662 __block
string errorMessage;
663 dispatch_sync(controlQueue, ^{
667 errorMessage = e.
what();
670 if (! errorMessage.empty())
681 pthread_detach(pthread_self());
692 void VuoRunner::setUpConnections(
void)
699 while (zmq_connect(
ZMQControl,ZMQControlURL.c_str()))
701 if (++numTries == 1000)
702 throw VuoException(
"The composition couldn't start because the runner couldn't establish communication to control the composition : " +
string(strerror(errno)));
703 usleep(USEC_PER_SEC / 1000);
707 arePublishedInputPortsCached =
false;
708 arePublishedOutputPortsCached =
false;
709 if (isInCurrentProcess())
711 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
712 __block
string publishedPortsError;
713 dispatch_async(queue, ^{
715 getCachedPublishedPorts(
false);
716 getCachedPublishedPorts(
true);
718 publishedPortsError = e.
what();
721 while (! (arePublishedInputPortsCached && arePublishedOutputPortsCached) )
724 usleep(USEC_PER_SEC / 1000);
726 if (! publishedPortsError.empty())
732 getCachedPublishedPorts(
false);
733 getCachedPublishedPorts(
true);
736 listenCanceled =
false;
737 dispatch_semaphore_wait(endedListeningSemaphore, DISPATCH_TIME_FOREVER);
739 pthread_t listenThread;
742 throw VuoException(
string(
"The composition couldn't start because the runner couldn't create a thread: ") + strerror(ret));
744 dispatch_semaphore_wait(beganListeningSemaphore, DISPATCH_TIME_FOREVER);
745 if (!listenError.empty())
746 throw VuoException(
"The composition couldn't start because the runner couldn't establish communication to listen to the composition: " + listenError);
768 if (! isInCurrentProcess())
769 throw VuoException(
"The composition is not running in the current process. Only use this function if the composition was constructed with newCurrentProcessRunnerFromDynamicLibrary().");
772 throw VuoException(
"This is not the main thread. Only call this function from the main thread.");
804 if (! isInCurrentProcess())
805 throw VuoException(
"The composition is not running in the current process. Only use this function if the composition was constructed with newCurrentProcessRunnerFromDynamicLibrary().");
808 throw VuoException(
"This is not the main thread. Only call this function from the main thread.");
822 dispatch_sync(controlQueue, ^{
823 if (stopped || lostContact) {
836 stopBecauseLostContact(e.
what());
850 dispatch_sync(controlQueue, ^{
851 if (stopped || lostContact) {
864 stopBecauseLostContact(e.
what());
892 if (! isUsingCompositionLoader())
893 throw VuoException(
"The runner is not using a composition loader. Only use this function if the composition was constructed with newSeparateProcessRunnerFromDynamicLibrary().");
895 dispatch_sync(controlQueue, ^{
896 if (stopped || lostContact) {
902 if (dylibPath != compositionDylibPath)
904 if (shouldDeleteBinariesWhenFinished)
906 remove(dylibPath.c_str());
909 dylibPath = compositionDylibPath;
923 cleanUpConnections();
928 unsigned int messageCount = 4 + dependencyDylibPathsAdded.size() + dependencyDylibPathsRemoved.size();
929 zmq_msg_t *messages = (zmq_msg_t *)malloc(messageCount *
sizeof(zmq_msg_t));
935 for (vector<string>::iterator i = dependencyDylibPathsAdded.begin(); i != dependencyDylibPathsAdded.end(); ++i) {
940 for (vector<string>::iterator i = dependencyDylibPathsRemoved.begin(); i != dependencyDylibPathsRemoved.end(); ++i) {
965 stopBecauseLostContact(e.
what());
985 dispatch_sync(controlQueue, ^{
997 int timeoutInSeconds = (isInCurrentProcess() ? -1 : 5);
998 zmq_msg_t messages[3];
1007 __block
bool replyReceived =
false;
1008 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
1018 replyReceived =
true;
1020 while (!replyReceived)
1023 usleep(USEC_PER_SEC / 1000);
1036 cleanUpConnections();
1052 VUserLog(
"The composition couldn't stop because vuoFini() couldn't be found in '%s' : %s", dylibPath.c_str(), dlerror());
1060 VUserLog(
"The composition couldn't stop because vuoFiniRuntimeState() couldn't be found in '%s' : %s", dylibPath.c_str(), dlerror());
1076 close(runnerReadCompositionWritePipe[1]);
1084 read(runnerReadCompositionWritePipe[0], &buf, 1);
1087 close(runnerReadCompositionWritePipe[0]);
1091 zmq_term(ZMQContext);
1096 dispatch_semaphore_wait(terminatedZMQContextSemaphore, DISPATCH_TIME_FOREVER);
1100 if (shouldDeleteBinariesWhenFinished)
1102 if (isUsingCompositionLoader())
1104 remove(dylibPath.c_str());
1106 else if (isInCurrentProcess())
1108 remove(dylibPath.c_str());
1112 remove(executablePath.c_str());
1116 delete dependencyLibraries;
1117 dependencyLibraries = NULL;
1120 dispatch_semaphore_signal(stoppedSemaphore);
1128 void VuoRunner::cleanUpConnections(
void)
1138 vuoSend(
"VuoRunner::ZMQSelfSend", ZMQSelfSend, 0,
nullptr, 0,
false,
nullptr);
1140 dispatch_semaphore_wait(endedListeningSemaphore, DISPATCH_TIME_FOREVER);
1141 dispatch_semaphore_signal(endedListeningSemaphore);
1151 dispatch_retain(stoppedSemaphore);
1152 dispatch_semaphore_wait(stoppedSemaphore, DISPATCH_TIME_FOREVER);
1153 dispatch_semaphore_signal(stoppedSemaphore);
1154 dispatch_release(stoppedSemaphore);
1174 const char *valueAsString = json_object_to_json_string_ext(value, JSON_C_TO_STRING_PLAIN);
1176 dispatch_sync(controlQueue, ^{
1177 if (stopped || lostContact) {
1185 zmq_msg_t messages[3];
1194 stopBecauseLostContact(e.
what());
1214 dispatch_sync(controlQueue, ^{
1215 if (stopped || lostContact) {
1223 zmq_msg_t messages[2];
1231 stopBecauseLostContact(e.
what());
1251 __block
string valueAsString;
1252 dispatch_sync(controlQueue, ^{
1253 if (stopped || lostContact) {
1261 zmq_msg_t messages[3];
1267 valueAsString = receiveString(
"null");
1271 stopBecauseLostContact(e.
what());
1274 return json_tokener_parse(valueAsString.c_str());
1292 __block
string valueAsString;
1293 dispatch_sync(controlQueue, ^{
1294 if (stopped || lostContact) {
1302 zmq_msg_t messages[3];
1308 valueAsString = receiveString(
"null");
1312 stopBecauseLostContact(e.
what());
1315 return json_tokener_parse(valueAsString.c_str());
1333 __block
string summary;
1334 dispatch_sync(controlQueue, ^{
1335 if (stopped || lostContact) {
1343 zmq_msg_t messages[2];
1348 summary = receiveString(
"");
1352 stopBecauseLostContact(e.
what());
1373 __block
string summary;
1374 dispatch_sync(controlQueue, ^{
1375 if (stopped || lostContact) {
1383 zmq_msg_t messages[2];
1388 summary = receiveString(
"");
1392 stopBecauseLostContact(e.
what());
1413 __block
string summary;
1414 dispatch_sync(controlQueue, ^{
1415 if (stopped || lostContact) {
1423 zmq_msg_t messages[2];
1428 summary = receiveString(
"");
1432 stopBecauseLostContact(e.
what());
1453 __block
string summary;
1454 dispatch_sync(controlQueue, ^{
1455 if (stopped || lostContact) {
1463 zmq_msg_t messages[2];
1468 summary = receiveString(
"");
1472 stopBecauseLostContact(e.
what());
1490 dispatch_sync(controlQueue, ^{
1491 if (stopped || lostContact) {
1499 zmq_msg_t messages[2];
1507 stopBecauseLostContact(e.
what());
1524 dispatch_sync(controlQueue, ^{
1525 if (stopped || lostContact) {
1533 zmq_msg_t messages[2];
1541 stopBecauseLostContact(e.
what());
1558 dispatch_sync(controlQueue, ^{
1559 if (stopped || lostContact) {
1567 zmq_msg_t messages[1];
1574 stopBecauseLostContact(e.
what());
1593 dispatch_sync(controlQueue, ^{
1594 if (stopped || lostContact) {
1602 zmq_msg_t messages[1];
1609 stopBecauseLostContact(e.
what());
1626 dispatch_sync(controlQueue, ^{
1627 if (stopped || lostContact) {
1635 zmq_msg_t messages[1];
1642 stopBecauseLostContact(e.
what());
1661 dispatch_sync(controlQueue, ^{
1662 if (stopped || lostContact) {
1670 zmq_msg_t messages[1];
1677 stopBecauseLostContact(e.
what());
1697 dispatch_sync(controlQueue, ^{
1698 if (stopped || lostContact) {
1706 int messageCount = portsAndValuesToSet.size() * 2;
1707 zmq_msg_t messages[messageCount];
1710 for (
auto &kv : portsAndValuesToSet)
1721 stopBecauseLostContact(e.
what());
1735 set<VuoRunner::Port *> portAsSet;
1736 portAsSet.insert(port);
1752 dispatch_sync(controlQueue, ^{
1753 if (stopped || lostContact) {
1759 lastFiredEventSignaled =
false;
1763 size_t messageCount = ports.size() + 1;
1764 zmq_msg_t messages[messageCount];
1777 stopBecauseLostContact(e.
what());
1808 saturating_semaphore_wait(lastFiredEventSemaphore, &lastFiredEventSignaled);
1823 __block
string valueAsString;
1824 dispatch_sync(controlQueue, ^{
1825 if (stopped || lostContact) {
1833 zmq_msg_t messages[2];
1838 valueAsString = receiveString(
"null");
1842 stopBecauseLostContact(e.
what());
1845 return json_tokener_parse(valueAsString.c_str());
1860 __block
string valueAsString;
1861 dispatch_sync(controlQueue, ^{
1862 if (stopped || lostContact) {
1870 zmq_msg_t messages[2];
1875 valueAsString = receiveString(
"null");
1879 stopBecauseLostContact(e.
what());
1882 return json_tokener_parse(valueAsString.c_str());
1899 vector<VuoRunner::Port *> VuoRunner::getCachedPublishedPorts(
bool input)
1907 if (! arePublishedInputPortsCached)
1909 publishedInputPorts = refreshPublishedPorts(
true);
1910 arePublishedInputPortsCached =
true;
1912 return publishedInputPorts;
1916 if (! arePublishedOutputPortsCached)
1918 publishedOutputPorts = refreshPublishedPorts(
false);
1919 arePublishedOutputPortsCached =
true;
1921 return publishedOutputPorts;
1936 vector<VuoRunner::Port *> VuoRunner::refreshPublishedPorts(
bool input)
1938 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
1939 dispatch_source_t timeout = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, queue);
1940 dispatch_source_set_timer(timeout, dispatch_time(DISPATCH_TIME_NOW, 5*NSEC_PER_SEC), NSEC_PER_SEC, NSEC_PER_SEC/10);
1941 dispatch_source_set_event_handler(timeout, ^{
1942 stopBecauseLostContact(
"The connection between the composition and runner timed out when trying to receive the list of published ports");
1943 dispatch_source_cancel(timeout);
1945 dispatch_resume(timeout);
1947 vector<VuoRunner::Port *> ports;
1974 vector<string> names;
1975 vector<string> types;
1976 vector<string> details;
1978 for (
int i = 0; i < 3; ++i)
1982 vector<string> messageStrings = receiveListOfStrings();
1984 names = messageStrings;
1986 types = messageStrings;
1988 details = messageStrings;
1991 for (
size_t i = 0; i < names.size() && i < types.size() && i < details.size(); ++i)
1993 VuoRunner::Port *port =
new Port(names[i], types[i], json_tokener_parse(details[i].c_str()));
1994 ports.push_back(port);
1999 dispatch_source_cancel(timeout);
2000 dispatch_release(timeout);
2004 dispatch_source_cancel(timeout);
2005 dispatch_release(timeout);
2021 return getCachedPublishedPorts(
true);
2035 return getCachedPublishedPorts(
false);
2050 for (vector<VuoRunner::Port *>::iterator i = inputPorts.begin(); i != inputPorts.end(); ++i)
2051 if ((*i)->getName() == name)
2069 for (vector<VuoRunner::Port *>::iterator i = outputPorts.begin(); i != outputPorts.end(); ++i)
2070 if ((*i)->getName() == name)
2087 void VuoRunner::listen()
2091 const char *compositionName = dylibPath.empty() ? executablePath.c_str() : dylibPath.c_str();
2094 if (
const char *lastSlash = strrchr(compositionName,
'/'))
2095 compositionName = lastSlash + 1;
2097 char threadName[MAXTHREADNAMESIZE];
2098 snprintf(threadName, MAXTHREADNAMESIZE,
"org.vuo.runner.telemetry: %s", compositionName);
2099 pthread_setname_np(threadName);
2102 ZMQSelfReceive = zmq_socket(ZMQContext, ZMQ_PAIR);
2104 if (zmq_bind(ZMQSelfReceive,
"inproc://vuo-runner-self") != 0)
2106 listenError = strerror(errno);
2107 dispatch_semaphore_signal(beganListeningSemaphore);
2111 ZMQSelfSend = zmq_socket(ZMQContext, ZMQ_PAIR);
2113 if (zmq_connect(ZMQSelfSend,
"inproc://vuo-runner-self") != 0)
2115 listenError = strerror(errno);
2116 dispatch_semaphore_signal(beganListeningSemaphore);
2121 ZMQTelemetry = zmq_socket(ZMQContext,ZMQ_SUB);
2123 if(zmq_connect(ZMQTelemetry,ZMQTelemetryURL.c_str()))
2125 listenError = strerror(errno);
2126 dispatch_semaphore_signal(beganListeningSemaphore);
2134 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2136 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2138 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2140 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2142 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2144 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2146 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2148 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2150 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2152 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2165 zmq_pollitem_t items[]=
2167 {ZMQTelemetry,0,ZMQ_POLLIN,0},
2171 zmq_poll(items,itemCount,timeout);
2174 dispatch_semaphore_signal(beganListeningSemaphore);
2176 bool pendingCancel =
false;
2177 while(! listenCanceled)
2179 zmq_pollitem_t items[]=
2181 {ZMQTelemetry,0,ZMQ_POLLIN,0},
2182 {ZMQSelfReceive,0,ZMQ_POLLIN,0},
2187 long timeout = pendingCancel ? USEC_PER_SEC / 10 : USEC_PER_SEC;
2188 zmq_poll(items,itemCount,timeout);
2189 if(items[0].revents & ZMQ_POLLIN)
2201 dispatch_sync(delegateQueue, ^{
2211 dispatch_sync(delegateQueue, ^{
2215 free(compositionIdentifier);
2216 free(nodeIdentifier);
2223 dispatch_sync(delegateQueue, ^{
2227 free(compositionIdentifier);
2228 free(nodeIdentifier);
2247 string portDataSummary;
2251 portDataSummary = s;
2255 portDataSummary =
"";
2257 dispatch_sync(delegateQueue, ^{
2264 free(portIdentifier);
2266 free(compositionIdentifier);
2286 string portDataSummary;
2290 portDataSummary = s;
2294 portDataSummary =
"";
2296 dispatch_sync(delegateQueue, ^{
2303 free(portIdentifier);
2305 free(compositionIdentifier);
2319 string portDataSummary;
2323 portDataSummary = s;
2327 portDataSummary =
"";
2331 dispatch_sync(delegateQueue, ^{
2337 free(portIdentifier);
2343 saturating_semaphore_signal(lastFiredEventSemaphore, &lastFiredEventSignaled);
2350 dispatch_sync(delegateQueue, ^{
2354 free(compositionIdentifier);
2355 free(portIdentifier);
2361 dispatch_sync(delegateQueue, ^{
2370 dispatch_sync(delegateQueue, ^{
2374 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2381 else if (! listenCanceled)
2383 if (items[1].revents & ZMQ_POLLIN)
2389 pendingCancel =
true;
2392 else if (pendingCancel)
2393 listenCanceled =
true;
2398 listenCanceled =
true;
2399 string dir, file, ext;
2401 stopBecauseLostContact(
"The connection between the composition ('" + file +
"') and runner timed out while listening for telemetry.");
2406 zmq_close(ZMQTelemetry);
2407 ZMQTelemetry = NULL;
2409 zmq_close(ZMQSelfSend);
2411 zmq_close(ZMQSelfReceive);
2412 ZMQSelfReceive = NULL;
2414 dispatch_semaphore_signal(endedListeningSemaphore);
2427 void VuoRunner::vuoControlRequestSend(
enum VuoControlRequest request, zmq_msg_t *messages,
unsigned int messageCount)
2430 vuoSend(
"runner VuoControl",
ZMQControl,request,messages,messageCount,
false,&error);
2449 void VuoRunner::vuoLoaderControlRequestSend(
enum VuoLoaderControlRequest request, zmq_msg_t *messages,
unsigned int messageCount)
2470 void VuoRunner::vuoControlReplyReceive(
enum VuoControlReply expectedReply)
2480 oss << e <<
" (expected " << expectedReply <<
")";
2483 else if (reply != expectedReply)
2486 oss <<
"The runner received the wrong message from the composition (expected " << expectedReply <<
", received " << reply <<
")";
2509 oss << e <<
" (expected " << expectedReply <<
")";
2512 else if (reply != expectedReply)
2515 oss <<
"The runner received the wrong message from the composition loader (expected " << expectedReply <<
", received " << reply <<
")";
2525 string VuoRunner::receiveString(
string fallbackIfNull)
2544 ret = fallbackIfNull;
2552 vector<string> VuoRunner::receiveListOfStrings(
void)
2554 vector<string> messageStrings;
2557 string s = receiveString(
"");
2558 messageStrings.push_back(s);
2560 return messageStrings;
2568 void VuoRunner::saturating_semaphore_signal(dispatch_semaphore_t dsema,
bool *signaled)
2570 if (__sync_bool_compare_and_swap(signaled,
false,
true))
2571 dispatch_semaphore_signal(dsema);
2579 void VuoRunner::saturating_semaphore_wait(dispatch_semaphore_t dsema,
bool *signaled)
2582 dispatch_semaphore_wait(dsema, DISPATCH_TIME_FOREVER);
2596 bool VuoRunner::isInCurrentProcess(
void)
2598 return executablePath.empty();
2605 bool VuoRunner::isUsingCompositionLoader(
void)
2607 return ! executablePath.empty() && ! dylibPath.empty();
2615 dispatch_sync(delegateQueue, ^{
2616 this->delegate = delegate;
2623 void VuoRunner::stopBecauseLostContact(
string errorMessage)
2625 __block
bool alreadyLostContact;
2626 dispatch_sync(delegateQueue, ^{
2627 alreadyLostContact = lostContact;
2631 if (alreadyLostContact)
2634 saturating_semaphore_signal(lastFiredEventSemaphore, &lastFiredEventSignaled);
2636 dispatch_sync(delegateQueue, ^{
2641 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2645 if (! isInCurrentProcess())
2651 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2654 zmq_term(ZMQContext);
2656 dispatch_semaphore_signal(terminatedZMQContextSemaphore);
2660 VUserLog(
"%s", errorMessage.c_str());
2670 return compositionPid;
2684 this->details = details;
2735 VuoRunnerDelegate::~VuoRunnerDelegate() { }