18#include <CoreServices/CoreServices.h>
24#include <mach-o/loader.h>
25#include <sys/proc_info.h>
29static const char *
mainThreadChecker =
"/Applications/Xcode.app/Contents/Developer/usr/lib/libMainThreadChecker.dylib";
39 int flags = fcntl(fd, F_GETFD);
42 VUserLog(
"Error: Couldn't get flags for desciptor %d: %s", fd, strerror(errno));
48 if (fcntl(fd, F_SETFD, flags) != 0)
49 VUserLog(
"Error: Couldn't set FD_CLOEXEC on descriptor %d: %s", fd, strerror(errno));
55static void __attribute__((constructor)) VuoRunner_init()
59#pragma clang diagnostic push
60#pragma clang diagnostic ignored "-Wdeprecated-declarations"
64#pragma clang diagnostic pop
91 zmq_setsockopt(zmqSocket, ZMQ_LINGER, &linger,
sizeof linger);
107 typedef void *(*vuoImageMakeFromJsonWithDimensionsType)(
json_object *,
unsigned int,
unsigned int);
130 vr->executablePath = executablePath;
132 vr->shouldDeleteBinariesWhenFinished = deleteExecutableWhenFinished;
133 vr->sourceDir = sourceDir;
153 const std::shared_ptr<VuoRunningCompositionLibraries> &runningCompositionLibraries,
157 vr->executablePath = compositionLoaderPath;
158 vr->dylibPath = compositionDylibPath;
159 vr->dependencyLibraries = runningCompositionLibraries;
160 vr->sourceDir = sourceDir;
162 vr->shouldDeleteBinariesWhenFinished = deleteDylibsWhenFinished;
163 runningCompositionLibraries->setDeleteResourceLibraries(deleteDylibsWhenFinished);
177 bool deleteDylibWhenFinished)
180 vr->dylibPath = dylibPath;
181 vr->shouldDeleteBinariesWhenFinished = deleteDylibWhenFinished;
182 vr->sourceDir = sourceDir;
193 dispatch_release(stoppedSemaphore);
194 dispatch_release(terminatedZMQContextSemaphore);
195 dispatch_release(beganListeningSemaphore);
196 dispatch_release(endedListeningSemaphore);
197 dispatch_release(lastFiredEventSemaphore);
198 dispatch_release(delegateQueue);
210 VUserLog(
"Error: Only call VuoRunner::setRuntimeChecking() prior to starting the composition.");
220VuoRunner::VuoRunner(
void)
224 dependencyLibraries = NULL;
225 shouldContinueIfRunnerDies =
false;
226 shouldDeleteBinariesWhenFinished =
false;
227 isRuntimeCheckingEnabled =
false;
231 listenCanceled =
false;
232 stoppedSemaphore = dispatch_semaphore_create(1);
233 terminatedZMQContextSemaphore = dispatch_semaphore_create(0);
234 beganListeningSemaphore = dispatch_semaphore_create(0);
235 endedListeningSemaphore = dispatch_semaphore_create(1);
236 lastFiredEventSemaphore = dispatch_semaphore_create(0);
237 lastFiredEventSignaled =
false;
238 controlQueue = dispatch_queue_create(
"org.vuo.runner.control", NULL);
241 ZMQSelfReceive = NULL;
244 ZMQLoaderControl = NULL;
246 delegateQueue = dispatch_queue_create(
"org.vuo.runner.delegate", NULL);
247 arePublishedInputPortsCached =
false;
248 arePublishedOutputPortsCached =
false;
250 static once_flag sleepHandlersInstalled;
251 call_once(sleepHandlersInstalled, [](){
277 if (isInCurrentProcess())
279 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
280 dispatch_async(queue, ^{
286 usleep(USEC_PER_SEC / 1000);
296 stopBecauseLostContact(e.
what());
326 stopBecauseLostContact(e.
what());
334void VuoRunner::copyDylibAndChangeId(
string dylibPath,
string &outputDylibPath)
336 string directory, file, extension;
339 const int makeTmpFileExtension = 7;
340 if (file.length() > makeTmpFileExtension)
343 string trimmedFile = file.substr(0, file.length() - makeTmpFileExtension);
349 alreadyLoaded = dlopen(outputDylibPath.c_str(), RTLD_NOLOAD);
350 }
while (alreadyLoaded);
360 outputDylibPath =
"/tmp/" + hash +
".dylib";
361 alreadyLoaded = dlopen(outputDylibPath.c_str(), RTLD_NOLOAD);
362 }
while (alreadyLoaded);
365 string newDirectory, newFile, newExtension;
368 if (newFile.length() > file.length())
369 throw VuoException(
"The composition couldn't start because the uniqued dylib name (" + newFile +
") is longer than the original dylib name (" + file +
").");
371 if (copyfile(dylibPath.c_str(), outputDylibPath.c_str(), NULL, COPYFILE_ALL))
372 throw VuoException(
"The composition couldn't start because a copy of the dylib couldn't be made.");
374 FILE *fp = fopen(outputDylibPath.c_str(),
"r+b");
376 throw VuoException(
"The composition couldn't start because the dylib's header couldn't be opened.");
379 struct mach_header_64 header;
380 if (fread(&header,
sizeof(header), 1, fp) != 1)
381 throw VuoException(
"The composition couldn't start because the dylib's header couldn't be read.");
383 if (header.magic != MH_MAGIC_64
384 || header.cputype != CPU_TYPE_X86_64)
385 throw VuoException(
"The composition couldn't start because the dylib isn't an x86_64-only (non-fat) Mach-O binary.");
387 for (
int i = 0; i < header.ncmds; ++i)
389 struct load_command lc;
390 if (fread(&lc,
sizeof(lc), 1, fp) != 1)
391 throw VuoException(
"The composition couldn't start because the dylib's command couldn't be read.");
394 if (lc.cmd == LC_ID_DYLIB)
396 fseek(fp,
sizeof(
struct dylib), SEEK_CUR);
398 size_t nameLength = lc.cmdsize -
sizeof(
struct dylib_command);
399 char *name = (
char *)calloc(nameLength + 1, 1);
400 if (fread(name, nameLength, 1, fp) != 1)
401 throw VuoException(
"The composition couldn't start because the dylib's ID command couldn't be read.");
404 fseek(fp, -nameLength, SEEK_CUR);
405 bzero(name, nameLength);
406 memcpy(name, outputDylibPath.c_str(), min(nameLength, outputDylibPath.length()));
407 fwrite(name, nameLength, 1, fp);
411 fseek(fp, lc.cmdsize-
sizeof(lc), SEEK_CUR);
414 throw VuoException(
"The composition couldn't start because the dylib's LC_ID_DYLIB command couldn't be found.");
426void VuoRunner::startInternal(
void)
429 dispatch_semaphore_wait(stoppedSemaphore, DISPATCH_TIME_FOREVER);
431 ZMQContext = zmq_init(1);
433 if (isInCurrentProcess())
437 bool alreadyLoaded = dlopen(dylibPath.c_str(), RTLD_NOLOAD);
443 string uniquedDylibPath;
444 copyDylibAndChangeId(dylibPath, uniquedDylibPath);
445 VDebugLog(
"\"%s\" is already loaded, so I duplicated it and changed its LC_ID_DYLIB to \"%s\".", dylibPath.c_str(), uniquedDylibPath.c_str());
447 if (shouldDeleteBinariesWhenFinished)
448 remove(dylibPath.c_str());
450 dylibPath = uniquedDylibPath;
451 shouldDeleteBinariesWhenFinished =
true;
456 throw VuoException(
"The composition couldn't start because the library '" + dylibPath +
"' couldn't be loaded : " + dlerror());
462 throw VuoException(
"The composition couldn't start because vuoInitInProcess() couldn't be found in '" + dylibPath +
"' : " + dlerror());
467 vuoInitInProcess(ZMQContext, ZMQControlURL.c_str(), ZMQTelemetryURL.c_str(),
true, getpid(), -1,
false,
484 string executableName;
485 if (isUsingCompositionLoader())
489 string dir, file, ext;
491 executableName = file;
495 string dir, file, ext;
497 string executableName = file;
499 executableName +=
"." + ext;
501 args.push_back(executableName);
513 args.push_back(
"--vuo-control=" + ZMQControlURL);
514 args.push_back(
"--vuo-telemetry=" + ZMQTelemetryURL);
519 args.push_back(
"--vuo-runner-pid=" + oss.str());
525 args.push_back(
"--vuo-runner-pipe=" + oss.str());
528 if (shouldContinueIfRunnerDies)
529 args.push_back(
"--vuo-continue-if-runner-dies");
531 if (isUsingCompositionLoader())
534 args.push_back(
"--vuo-loader=" + ZMQLoaderControlURL);
537 args.push_back(
"--vuo-pause");
542 throw VuoException(
"The composition couldn't start because a pipe couldn't be opened : " +
string(strerror(errno)));
544 int argSize = args.size();
545 char *argv[argSize + 1];
546 for (
size_t i = 0; i < argSize; ++i)
548 size_t mallocSize = args[i].length() + 1;
549 argv[i] = (
char *)malloc(mallocSize);
550 strlcpy(argv[i], args[i].c_str(), mallocSize);
552 argv[argSize] = NULL;
554 string errorWorkingDirectory =
"The composition couldn't start because the working directory couldn't be changed to '" + sourceDir +
"' : ";
555 string errorExecutable =
"The composition couldn't start because the file '" + executablePath +
"' couldn't be executed : ";
556 string errorFork =
"The composition couldn't start because the composition process couldn't be forked : ";
557 const size_t ERROR_BUFFER_LEN = 256;
558 char errorBuffer[ERROR_BUFFER_LEN];
560 pipe(runnerReadCompositionWritePipe);
562 pid_t childPid = fork();
569 close(runnerReadCompositionWritePipe[0]);
571 pid_t grandchildPid = fork();
572 if (grandchildPid == 0)
579 if (!sourceDir.empty())
581 ret = chdir(sourceDir.c_str());
584 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
585 write(STDERR_FILENO, errorWorkingDirectory.c_str(), errorWorkingDirectory.length());
586 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
587 write(STDERR_FILENO,
"\n", 1);
592 if (isRuntimeCheckingEnabled)
594 const char *vuoRuntimeChecking = getenv(
"VUO_RUNTIME_CHECKING");
595 if (vuoRuntimeChecking)
596 setenv(
"DYLD_INSERT_LIBRARIES", vuoRuntimeChecking, 1);
601 ret = execv(executablePath.c_str(), argv);
604 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
605 write(STDERR_FILENO, errorExecutable.c_str(), errorExecutable.length());
606 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
607 write(STDERR_FILENO,
"\n", 1);
608 for (
size_t i = 0; i < argSize; ++i)
613 else if (grandchildPid > 0)
617 int ret = write(fd[1], &grandchildPid,
sizeof(pid_t));
618 if (ret !=
sizeof(pid_t))
620 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
621 write(STDERR_FILENO, errorExecutable.c_str(), errorExecutable.length());
622 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
623 write(STDERR_FILENO,
"\n", 1);
634 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
635 write(STDERR_FILENO, errorFork.c_str(), errorFork.length());
636 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
637 write(STDERR_FILENO,
"\n", 1);
641 else if (childPid > 0)
650 for (
size_t i = 0; i < argSize; ++i)
653 pid_t grandchildPid = 0;
654 int ret = read(fd[0], &grandchildPid,
sizeof(pid_t));
655 if (ret !=
sizeof(pid_t))
656 throw VuoException(
"The composition couldn't start because the composition process id couldn't be obtained: " +
string(strerror(errno)));
662 ret = waitpid(childPid, &status, 0);
663 }
while (ret == -1 && errno == EINTR);
664 if (WIFEXITED(status) && WEXITSTATUS(status))
665 throw VuoException(
"The composition couldn't start because the parent of the composition process exited with an error.");
666 else if (WIFSIGNALED(status))
667 throw VuoException(
"The composition couldn't start because the parent of the composition process exited abnormally : " +
string(strsignal(WTERMSIG(status))));
669 if (grandchildPid > 0)
670 compositionPid = grandchildPid;
672 throw VuoException(
"The composition couldn't start because the composition process id couldn't be obtained");
676 for (
size_t i = 0; i < argSize; ++i)
679 throw VuoException(
"The composition couldn't start because the parent of the composition process couldn't be forked : " +
string(strerror(errno)));
684 if (isUsingCompositionLoader())
693 if (++numTries == 1000)
694 throw VuoException(
"The composition couldn't start because the runner couldn't establish communication with the composition loader : " +
string(strerror(errno)));
695 usleep(USEC_PER_SEC / 1000);
704 __block
string errorMessage;
705 dispatch_sync(controlQueue, ^{
709 errorMessage = e.
what();
712 if (! errorMessage.empty())
723 pthread_detach(pthread_self());
734void VuoRunner::setUpConnections(
void)
741 while (zmq_connect(
ZMQControl,ZMQControlURL.c_str()))
743 if (++numTries == 1000)
744 throw VuoException(
"The composition couldn't start because the runner couldn't establish communication to control the composition : " +
string(strerror(errno)));
745 usleep(USEC_PER_SEC / 1000);
749 arePublishedInputPortsCached =
false;
750 arePublishedOutputPortsCached =
false;
751 if (isInCurrentProcess())
753 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
754 __block
string publishedPortsError;
755 dispatch_async(queue, ^{
757 getCachedPublishedPorts(
false);
758 getCachedPublishedPorts(
true);
760 publishedPortsError = e.
what();
763 while (! (arePublishedInputPortsCached && arePublishedOutputPortsCached) )
766 usleep(USEC_PER_SEC / 1000);
768 if (! publishedPortsError.empty())
774 getCachedPublishedPorts(
false);
775 getCachedPublishedPorts(
true);
778 listenCanceled =
false;
779 dispatch_semaphore_wait(endedListeningSemaphore, DISPATCH_TIME_FOREVER);
781 pthread_t listenThread;
784 throw VuoException(
string(
"The composition couldn't start because the runner couldn't create a thread: ") + strerror(ret));
786 dispatch_semaphore_wait(beganListeningSemaphore, DISPATCH_TIME_FOREVER);
787 if (!listenError.empty())
788 throw VuoException(
"The composition couldn't start because the runner couldn't establish communication to listen to the composition: " + listenError);
810 if (! isInCurrentProcess())
811 throw VuoException(
"The composition is not running in the current process. Only use this function if the composition was constructed with newCurrentProcessRunnerFromDynamicLibrary().");
814 throw VuoException(
"This is not the main thread. Only call this function from the main thread.");
846 if (! isInCurrentProcess())
847 throw VuoException(
"The composition is not running in the current process. Only use this function if the composition was constructed with newCurrentProcessRunnerFromDynamicLibrary().");
850 throw VuoException(
"This is not the main thread. Only call this function from the main thread.");
864 dispatch_sync(controlQueue, ^{
865 if (stopped || lostContact) {
878 stopBecauseLostContact(e.
what());
892 dispatch_sync(controlQueue, ^{
893 if (stopped || lostContact) {
906 stopBecauseLostContact(e.
what());
934 if (! isUsingCompositionLoader())
935 throw VuoException(
"The runner is not using a composition loader. Only use this function if the composition was constructed with newSeparateProcessRunnerFromDynamicLibrary().");
937 dispatch_sync(controlQueue, ^{
938 if (stopped || lostContact) {
944 if (dylibPath != compositionDylibPath)
946 if (shouldDeleteBinariesWhenFinished)
948 remove(dylibPath.c_str());
951 dylibPath = compositionDylibPath;
965 cleanUpConnections();
967 vector<string> dependencyDylibPathsRemoved = dependencyLibraries->dequeueLibrariesToUnload();
968 vector<string> dependencyDylibPathsAdded = dependencyLibraries->dequeueLibrariesToLoad();
970 unsigned int messageCount = 4 + dependencyDylibPathsAdded.size() + dependencyDylibPathsRemoved.size();
971 zmq_msg_t *messages = (zmq_msg_t *)malloc(messageCount *
sizeof(zmq_msg_t));
977 for (vector<string>::iterator i = dependencyDylibPathsAdded.begin(); i != dependencyDylibPathsAdded.end(); ++i) {
982 for (vector<string>::iterator i = dependencyDylibPathsRemoved.begin(); i != dependencyDylibPathsRemoved.end(); ++i) {
989 VUserLog(
" Replacing composition…");
1007 stopBecauseLostContact(e.
what());
1027 dispatch_sync(controlQueue, ^{
1039 int timeoutInSeconds = (isInCurrentProcess() ? -1 : 5);
1040 zmq_msg_t messages[3];
1049 __block
bool replyReceived =
false;
1050 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
1060 replyReceived =
true;
1062 while (!replyReceived)
1065 usleep(USEC_PER_SEC / 1000);
1078 cleanUpConnections();
1094 VUserLog(
"The composition couldn't stop because vuoFini() couldn't be found in '%s' : %s", dylibPath.c_str(), dlerror());
1102 VUserLog(
"The composition couldn't stop because vuoFiniRuntimeState() couldn't be found in '%s' : %s", dylibPath.c_str(), dlerror());
1118 close(runnerReadCompositionWritePipe[1]);
1126 read(runnerReadCompositionWritePipe[0], &buf, 1);
1129 close(runnerReadCompositionWritePipe[0]);
1133 zmq_term(ZMQContext);
1138 dispatch_semaphore_wait(terminatedZMQContextSemaphore, DISPATCH_TIME_FOREVER);
1142 if (shouldDeleteBinariesWhenFinished)
1144 if (isUsingCompositionLoader())
1146 remove(dylibPath.c_str());
1148 else if (isInCurrentProcess())
1150 remove(dylibPath.c_str());
1154 remove(executablePath.c_str());
1158 dependencyLibraries =
nullptr;
1161 dispatch_semaphore_signal(stoppedSemaphore);
1169void VuoRunner::cleanUpConnections(
void)
1179 vuoSend(
"VuoRunner::ZMQSelfSend", ZMQSelfSend, 0,
nullptr, 0,
false,
nullptr);
1181 dispatch_semaphore_wait(endedListeningSemaphore, DISPATCH_TIME_FOREVER);
1182 dispatch_semaphore_signal(endedListeningSemaphore);
1192 dispatch_retain(stoppedSemaphore);
1193 dispatch_semaphore_wait(stoppedSemaphore, DISPATCH_TIME_FOREVER);
1194 dispatch_semaphore_signal(stoppedSemaphore);
1195 dispatch_release(stoppedSemaphore);
1215 const char *valueAsString = json_object_to_json_string_ext(value, JSON_C_TO_STRING_PLAIN);
1217 dispatch_sync(controlQueue, ^{
1218 if (stopped || lostContact) {
1226 zmq_msg_t messages[3];
1235 stopBecauseLostContact(e.
what());
1255 dispatch_sync(controlQueue, ^{
1256 if (stopped || lostContact) {
1264 zmq_msg_t messages[2];
1272 stopBecauseLostContact(e.
what());
1292 __block
string valueAsString;
1293 dispatch_sync(controlQueue, ^{
1294 if (stopped || lostContact) {
1302 zmq_msg_t messages[3];
1308 valueAsString = receiveString(
"null");
1312 stopBecauseLostContact(e.
what());
1315 return json_tokener_parse(valueAsString.c_str());
1333 __block
string valueAsString;
1334 dispatch_sync(controlQueue, ^{
1335 if (stopped || lostContact) {
1343 zmq_msg_t messages[3];
1349 valueAsString = receiveString(
"null");
1353 stopBecauseLostContact(e.
what());
1356 return json_tokener_parse(valueAsString.c_str());
1374 __block
string summary;
1375 dispatch_sync(controlQueue, ^{
1376 if (stopped || lostContact) {
1384 zmq_msg_t messages[2];
1389 summary = receiveString(
"");
1393 stopBecauseLostContact(e.
what());
1414 __block
string summary;
1415 dispatch_sync(controlQueue, ^{
1416 if (stopped || lostContact) {
1424 zmq_msg_t messages[2];
1429 summary = receiveString(
"");
1433 stopBecauseLostContact(e.
what());
1454 __block
string summary;
1455 dispatch_sync(controlQueue, ^{
1456 if (stopped || lostContact) {
1464 zmq_msg_t messages[2];
1469 summary = receiveString(
"");
1473 stopBecauseLostContact(e.
what());
1494 __block
string summary;
1495 dispatch_sync(controlQueue, ^{
1496 if (stopped || lostContact) {
1504 zmq_msg_t messages[2];
1509 summary = receiveString(
"");
1513 stopBecauseLostContact(e.
what());
1531 dispatch_sync(controlQueue, ^{
1532 if (stopped || lostContact) {
1540 zmq_msg_t messages[2];
1548 stopBecauseLostContact(e.
what());
1565 dispatch_sync(controlQueue, ^{
1566 if (stopped || lostContact) {
1574 zmq_msg_t messages[2];
1582 stopBecauseLostContact(e.
what());
1599 dispatch_sync(controlQueue, ^{
1600 if (stopped || lostContact) {
1608 zmq_msg_t messages[1];
1615 stopBecauseLostContact(e.
what());
1634 dispatch_sync(controlQueue, ^{
1635 if (stopped || lostContact) {
1643 zmq_msg_t messages[1];
1650 stopBecauseLostContact(e.
what());
1667 dispatch_sync(controlQueue, ^{
1668 if (stopped || lostContact) {
1676 zmq_msg_t messages[1];
1683 stopBecauseLostContact(e.
what());
1702 dispatch_sync(controlQueue, ^{
1703 if (stopped || lostContact) {
1711 zmq_msg_t messages[1];
1718 stopBecauseLostContact(e.
what());
1739 for (
auto i : portsAndValuesToSet)
1741 string portName = i.first->getName();
1742 if (portName ==
"width")
1743 p->
lastWidth = json_object_get_int64(i.second);
1744 else if (portName ==
"height")
1745 p->
lastHeight = json_object_get_int64(i.second);
1746 else if (portName ==
"image" || portName ==
"startImage")
1749 if (json_object_object_get_ex(i.second,
"pixelsWide", &o))
1750 p->
lastWidth = json_object_get_int64(o);
1751 if (json_object_object_get_ex(i.second,
"pixelsHigh", &o))
1756 dispatch_sync(controlQueue, ^{
1757 if (stopped || lostContact) {
1765 int messageCount = portsAndValuesToSet.size() * 2;
1766 zmq_msg_t messages[messageCount];
1769 for (
auto &kv : portsAndValuesToSet)
1780 stopBecauseLostContact(e.
what());
1794 set<VuoRunner::Port *> portAsSet;
1795 portAsSet.insert(port);
1811 dispatch_sync(controlQueue, ^{
1812 if (stopped || lostContact) {
1818 lastFiredEventSignaled =
false;
1822 size_t messageCount = ports.size() + 1;
1823 zmq_msg_t messages[messageCount];
1836 stopBecauseLostContact(e.
what());
1867 saturating_semaphore_wait(lastFiredEventSemaphore, &lastFiredEventSignaled);
1882 __block
string valueAsString;
1883 dispatch_sync(controlQueue, ^{
1884 if (stopped || lostContact) {
1892 zmq_msg_t messages[2];
1897 valueAsString = receiveString(
"null");
1901 stopBecauseLostContact(e.
what());
1904 return json_tokener_parse(valueAsString.c_str());
1919 __block
string valueAsString;
1920 dispatch_sync(controlQueue, ^{
1921 if (stopped || lostContact) {
1929 zmq_msg_t messages[2];
1934 valueAsString = receiveString(
"null");
1938 stopBecauseLostContact(e.
what());
1943 json_object *js = json_tokener_parse(valueAsString.c_str());
1947 uint64_t actualWidth = 0;
1948 if (json_object_object_get_ex(js,
"pixelsWide", &o))
1949 actualWidth = json_object_get_int64(o);
1950 uint64_t actualHeight = 0;
1951 if (json_object_object_get_ex(js,
"pixelsHigh", &o))
1952 actualHeight = json_object_get_int64(o);
1958 p->vuoImageMakeFromJsonWithDimensions = (Private::vuoImageMakeFromJsonWithDimensionsType)dlsym(RTLD_SELF,
"VuoImage_makeFromJsonWithDimensions");
1959 if (!p->vuoImageMakeFromJsonWithDimensions)
1961 VUserLog(
"Error: Couldn't find VuoImage_makeFromJsonWithDimensions.");
1968 VUserLog(
"Error: Couldn't find VuoImage_getInterprocessJson.");
1998vector<VuoRunner::Port *> VuoRunner::getCachedPublishedPorts(
bool input)
2006 if (! arePublishedInputPortsCached)
2008 publishedInputPorts = refreshPublishedPorts(
true);
2009 arePublishedInputPortsCached =
true;
2011 return publishedInputPorts;
2015 if (! arePublishedOutputPortsCached)
2017 publishedOutputPorts = refreshPublishedPorts(
false);
2018 arePublishedOutputPortsCached =
true;
2020 return publishedOutputPorts;
2035vector<VuoRunner::Port *> VuoRunner::refreshPublishedPorts(
bool input)
2037 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
2038 dispatch_source_t timeout = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, queue);
2039 dispatch_source_set_timer(timeout, dispatch_time(DISPATCH_TIME_NOW, 5*NSEC_PER_SEC), NSEC_PER_SEC, NSEC_PER_SEC/10);
2040 dispatch_source_set_event_handler(timeout, ^{
2041 stopBecauseLostContact(
"The connection between the composition and runner timed out when trying to receive the list of published ports");
2042 dispatch_source_cancel(timeout);
2044 dispatch_resume(timeout);
2046 vector<VuoRunner::Port *> ports;
2073 vector<string> names;
2074 vector<string> types;
2075 vector<string> details;
2077 for (
int i = 0; i < 3; ++i)
2081 vector<string> messageStrings = receiveListOfStrings();
2083 names = messageStrings;
2085 types = messageStrings;
2087 details = messageStrings;
2090 for (
size_t i = 0; i < names.size() && i < types.size() && i < details.size(); ++i)
2092 VuoRunner::Port *port =
new Port(names[i], types[i], json_tokener_parse(details[i].c_str()));
2093 ports.push_back(port);
2098 dispatch_source_cancel(timeout);
2099 dispatch_release(timeout);
2103 dispatch_source_cancel(timeout);
2104 dispatch_release(timeout);
2120 return getCachedPublishedPorts(
true);
2134 return getCachedPublishedPorts(
false);
2149 for (vector<VuoRunner::Port *>::iterator i = inputPorts.begin(); i != inputPorts.end(); ++i)
2150 if ((*i)->getName() == name)
2168 for (vector<VuoRunner::Port *>::iterator i = outputPorts.begin(); i != outputPorts.end(); ++i)
2169 if ((*i)->getName() == name)
2186void VuoRunner::listen()
2190 const char *compositionName = dylibPath.empty() ? executablePath.c_str() : dylibPath.c_str();
2193 if (
const char *lastSlash = strrchr(compositionName,
'/'))
2194 compositionName = lastSlash + 1;
2196 char threadName[MAXTHREADNAMESIZE];
2197 snprintf(threadName, MAXTHREADNAMESIZE,
"org.vuo.runner.telemetry: %s", compositionName);
2198 pthread_setname_np(threadName);
2201 ZMQSelfReceive = zmq_socket(ZMQContext, ZMQ_PAIR);
2203 if (zmq_bind(ZMQSelfReceive,
"inproc://vuo-runner-self") != 0)
2205 listenError = strerror(errno);
2206 dispatch_semaphore_signal(beganListeningSemaphore);
2210 ZMQSelfSend = zmq_socket(ZMQContext, ZMQ_PAIR);
2212 if (zmq_connect(ZMQSelfSend,
"inproc://vuo-runner-self") != 0)
2214 listenError = strerror(errno);
2215 dispatch_semaphore_signal(beganListeningSemaphore);
2220 ZMQTelemetry = zmq_socket(ZMQContext,ZMQ_SUB);
2222 if(zmq_connect(ZMQTelemetry,ZMQTelemetryURL.c_str()))
2224 listenError = strerror(errno);
2225 dispatch_semaphore_signal(beganListeningSemaphore);
2229 const int highWaterMark = 0;
2230 if(zmq_setsockopt(ZMQTelemetry,ZMQ_RCVHWM,&highWaterMark,
sizeof(highWaterMark)))
2232 listenError = strerror(errno);
2233 dispatch_semaphore_signal(beganListeningSemaphore);
2241 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2243 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2245 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2247 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2249 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2251 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2253 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2255 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2257 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2259 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2272 zmq_pollitem_t items[]=
2274 {ZMQTelemetry,0,ZMQ_POLLIN,0},
2278 zmq_poll(items,itemCount,timeout);
2281 dispatch_semaphore_signal(beganListeningSemaphore);
2283 bool pendingCancel =
false;
2284 while(! listenCanceled)
2286 zmq_pollitem_t items[]=
2288 {ZMQTelemetry,0,ZMQ_POLLIN,0},
2289 {ZMQSelfReceive,0,ZMQ_POLLIN,0},
2294 long timeout = pendingCancel ? 100 : 1000;
2295 zmq_poll(items,itemCount,timeout);
2296 if(items[0].revents & ZMQ_POLLIN)
2306 dispatch_sync(delegateQueue, ^{
2316 dispatch_sync(delegateQueue, ^{
2320 free(compositionIdentifier);
2321 free(nodeIdentifier);
2328 dispatch_sync(delegateQueue, ^{
2332 free(compositionIdentifier);
2333 free(nodeIdentifier);
2352 string portDataSummary;
2356 portDataSummary = s;
2360 portDataSummary =
"";
2362 dispatch_sync(delegateQueue, ^{
2369 free(portIdentifier);
2371 free(compositionIdentifier);
2391 string portDataSummary;
2395 portDataSummary = s;
2399 portDataSummary =
"";
2401 dispatch_sync(delegateQueue, ^{
2408 free(portIdentifier);
2410 free(compositionIdentifier);
2424 string portDataSummary;
2428 portDataSummary = s;
2432 portDataSummary =
"";
2436 dispatch_sync(delegateQueue, ^{
2442 free(portIdentifier);
2448 saturating_semaphore_signal(lastFiredEventSemaphore, &lastFiredEventSignaled);
2455 dispatch_sync(delegateQueue, ^{
2459 free(compositionIdentifier);
2460 free(portIdentifier);
2466 dispatch_sync(delegateQueue, ^{
2475 dispatch_sync(delegateQueue, ^{
2479 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2485 VUserLog(
"Error: Unknown telemetry message type: %d", type);
2489 else if (! listenCanceled)
2491 if (items[1].revents & ZMQ_POLLIN)
2497 pendingCancel =
true;
2500 else if (pendingCancel)
2501 listenCanceled =
true;
2509 VDebugLog(
"zmq_poll timed out, but system is sleeping so I'll try again.");
2511 VDebugLog(
"zmq_poll timed out, but a debugger is attached to the host so I'll try again.");
2514 listenCanceled =
true;
2515 string dir, file, ext;
2517 stopBecauseLostContact(
"The connection between the composition ('" + file +
"') and runner timed out while listening for telemetry.");
2523 zmq_close(ZMQTelemetry);
2524 ZMQTelemetry = NULL;
2526 zmq_close(ZMQSelfSend);
2528 zmq_close(ZMQSelfReceive);
2529 ZMQSelfReceive = NULL;
2531 dispatch_semaphore_signal(endedListeningSemaphore);
2544void VuoRunner::vuoControlRequestSend(
enum VuoControlRequest request, zmq_msg_t *messages,
unsigned int messageCount)
2547 vuoSend(
"runner VuoControl",
ZMQControl,request,messages,messageCount,
false,&error);
2566void VuoRunner::vuoLoaderControlRequestSend(
enum VuoLoaderControlRequest request, zmq_msg_t *messages,
unsigned int messageCount)
2587void VuoRunner::vuoControlReplyReceive(
enum VuoControlReply expectedReply)
2597 oss << e <<
" (expected " << expectedReply <<
")";
2600 else if (reply != expectedReply)
2603 oss <<
"The runner received the wrong message from the composition (expected " << expectedReply <<
", received " << reply <<
")";
2626 oss << e <<
" (expected " << expectedReply <<
")";
2629 else if (reply != expectedReply)
2632 oss <<
"The runner received the wrong message from the composition loader (expected " << expectedReply <<
", received " << reply <<
")";
2642string VuoRunner::receiveString(
string fallbackIfNull)
2661 ret = fallbackIfNull;
2669vector<string> VuoRunner::receiveListOfStrings(
void)
2671 vector<string> messageStrings;
2674 string s = receiveString(
"");
2675 messageStrings.push_back(s);
2677 return messageStrings;
2685void VuoRunner::saturating_semaphore_signal(dispatch_semaphore_t dsema,
bool *signaled)
2687 if (__sync_bool_compare_and_swap(signaled,
false,
true))
2688 dispatch_semaphore_signal(dsema);
2696void VuoRunner::saturating_semaphore_wait(dispatch_semaphore_t dsema,
bool *signaled)
2699 dispatch_semaphore_wait(dsema, DISPATCH_TIME_FOREVER);
2713bool VuoRunner::isInCurrentProcess(
void)
2715 return executablePath.empty();
2722bool VuoRunner::isUsingCompositionLoader(
void)
2724 return ! executablePath.empty() && ! dylibPath.empty();
2732 dispatch_sync(delegateQueue, ^{
2733 this->delegate = delegate;
2740void VuoRunner::stopBecauseLostContact(
string errorMessage)
2742 __block
bool alreadyLostContact;
2743 dispatch_sync(delegateQueue, ^{
2744 alreadyLostContact = lostContact;
2748 if (alreadyLostContact)
2751 saturating_semaphore_signal(lastFiredEventSemaphore, &lastFiredEventSignaled);
2753 dispatch_sync(delegateQueue, ^{
2758 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2762 if (! isInCurrentProcess())
2768 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2771 zmq_term(ZMQContext);
2773 dispatch_semaphore_signal(terminatedZMQContextSemaphore);
2777 VUserLog(
"%s", errorMessage.c_str());
2787 return compositionPid;
2801 this->details = details;
2852VuoRunnerDelegate::~VuoRunnerDelegate() { }