18 #include <CoreServices/CoreServices.h>
24 #include <mach-o/loader.h>
25 #include <sys/proc_info.h>
29 static const char *
mainThreadChecker =
"/Applications/Xcode.app/Contents/Developer/usr/lib/libMainThreadChecker.dylib";
39 int flags = fcntl(fd, F_GETFD);
42 VUserLog(
"Error: Couldn't get flags for desciptor %d: %s", fd, strerror(errno));
48 if (fcntl(fd, F_SETFD, flags) != 0)
49 VUserLog(
"Error: Couldn't set FD_CLOEXEC on descriptor %d: %s", fd, strerror(errno));
55 static void __attribute__((constructor)) VuoRunner_init()
59 #pragma clang diagnostic push
60 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
64 #pragma clang diagnostic pop
92 if (timeoutInSeconds >= 0)
94 int timeoutInMilliseconds = timeoutInSeconds * 1000;
95 zmq_setsockopt(zmqSocket, ZMQ_RCVTIMEO, &timeoutInMilliseconds,
sizeof timeoutInMilliseconds);
96 zmq_setsockopt(zmqSocket, ZMQ_SNDTIMEO, &timeoutInMilliseconds,
sizeof timeoutInMilliseconds);
100 zmq_setsockopt(zmqSocket, ZMQ_LINGER, &linger,
sizeof linger);
116 typedef void *(*vuoImageMakeFromJsonWithDimensionsType)(
json_object *,
unsigned int,
unsigned int);
139 vr->executablePath = executablePath;
141 vr->shouldDeleteBinariesWhenFinished = deleteExecutableWhenFinished;
142 vr->sourceDir = sourceDir;
162 const std::shared_ptr<VuoRunningCompositionLibraries> &runningCompositionLibraries,
166 vr->executablePath = compositionLoaderPath;
167 vr->dylibPath = compositionDylibPath;
168 vr->dependencyLibraries = runningCompositionLibraries;
169 vr->sourceDir = sourceDir;
171 vr->shouldDeleteBinariesWhenFinished = deleteDylibsWhenFinished;
172 runningCompositionLibraries->setDeleteResourceLibraries(deleteDylibsWhenFinished);
186 bool deleteDylibWhenFinished)
189 vr->dylibPath = dylibPath;
190 vr->shouldDeleteBinariesWhenFinished = deleteDylibWhenFinished;
191 vr->sourceDir = sourceDir;
202 dispatch_release(stoppedSemaphore);
203 dispatch_release(terminatedZMQContextSemaphore);
204 dispatch_release(beganListeningSemaphore);
205 dispatch_release(endedListeningSemaphore);
206 dispatch_release(lastFiredEventSemaphore);
207 dispatch_release(delegateQueue);
219 VUserLog(
"Error: Only call VuoRunner::setRuntimeChecking() prior to starting the composition.");
229 VuoRunner::VuoRunner(
void)
233 dependencyLibraries = NULL;
234 shouldContinueIfRunnerDies =
false;
235 shouldDeleteBinariesWhenFinished =
false;
236 isRuntimeCheckingEnabled =
false;
240 listenCanceled =
false;
241 stoppedSemaphore = dispatch_semaphore_create(1);
242 terminatedZMQContextSemaphore = dispatch_semaphore_create(0);
243 beganListeningSemaphore = dispatch_semaphore_create(0);
244 endedListeningSemaphore = dispatch_semaphore_create(1);
245 lastFiredEventSemaphore = dispatch_semaphore_create(0);
246 lastFiredEventSignaled =
false;
247 controlQueue = dispatch_queue_create(
"org.vuo.runner.control", NULL);
250 ZMQSelfReceive = NULL;
253 ZMQLoaderControl = NULL;
255 delegateQueue = dispatch_queue_create(
"org.vuo.runner.delegate", NULL);
256 arePublishedInputPortsCached =
false;
257 arePublishedOutputPortsCached =
false;
281 if (isInCurrentProcess())
283 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
284 dispatch_async(queue, ^{
290 usleep(USEC_PER_SEC / 1000);
300 stopBecauseLostContact(e.
what());
330 stopBecauseLostContact(e.
what());
338 void VuoRunner::copyDylibAndChangeId(
string dylibPath,
string &outputDylibPath)
340 string directory, file, extension;
343 const int makeTmpFileExtension = 7;
344 if (file.length() > makeTmpFileExtension)
347 string trimmedFile = file.substr(0, file.length() - makeTmpFileExtension);
353 alreadyLoaded = dlopen(outputDylibPath.c_str(), RTLD_NOLOAD);
354 }
while (alreadyLoaded);
364 outputDylibPath =
"/tmp/" + hash +
".dylib";
365 alreadyLoaded = dlopen(outputDylibPath.c_str(), RTLD_NOLOAD);
366 }
while (alreadyLoaded);
369 string newDirectory, newFile, newExtension;
372 if (newFile.length() > file.length())
373 throw VuoException(
"The composition couldn't start because the uniqued dylib name (" + newFile +
") is longer than the original dylib name (" + file +
").");
375 if (copyfile(dylibPath.c_str(), outputDylibPath.c_str(), NULL, COPYFILE_ALL))
376 throw VuoException(
"The composition couldn't start because a copy of the dylib couldn't be made.");
378 FILE *fp = fopen(outputDylibPath.c_str(),
"r+b");
380 throw VuoException(
"The composition couldn't start because the dylib's header couldn't be opened.");
383 struct mach_header_64 header;
384 if (fread(&header,
sizeof(header), 1, fp) != 1)
385 throw VuoException(
"The composition couldn't start because the dylib's header couldn't be read.");
387 if (header.magic != MH_MAGIC_64
388 || header.cputype != CPU_TYPE_X86_64)
389 throw VuoException(
"The composition couldn't start because the dylib isn't an x86_64-only (non-fat) Mach-O binary.");
391 for (
int i = 0; i < header.ncmds; ++i)
393 struct load_command lc;
394 if (fread(&lc,
sizeof(lc), 1, fp) != 1)
395 throw VuoException(
"The composition couldn't start because the dylib's command couldn't be read.");
398 if (lc.cmd == LC_ID_DYLIB)
400 fseek(fp,
sizeof(
struct dylib), SEEK_CUR);
402 size_t nameLength = lc.cmdsize -
sizeof(
struct dylib_command);
403 char *name = (
char *)calloc(nameLength + 1, 1);
404 if (fread(name, nameLength, 1, fp) != 1)
405 throw VuoException(
"The composition couldn't start because the dylib's ID command couldn't be read.");
408 fseek(fp, -nameLength, SEEK_CUR);
409 bzero(name, nameLength);
410 memcpy(name, outputDylibPath.c_str(), min(nameLength, outputDylibPath.length()));
411 fwrite(name, nameLength, 1, fp);
415 fseek(fp, lc.cmdsize-
sizeof(lc), SEEK_CUR);
418 throw VuoException(
"The composition couldn't start because the dylib's LC_ID_DYLIB command couldn't be found.");
430 void VuoRunner::startInternal(
void)
433 dispatch_semaphore_wait(stoppedSemaphore, DISPATCH_TIME_FOREVER);
435 ZMQContext = zmq_init(1);
437 if (isInCurrentProcess())
441 bool alreadyLoaded = dlopen(dylibPath.c_str(), RTLD_NOLOAD);
447 string uniquedDylibPath;
448 copyDylibAndChangeId(dylibPath, uniquedDylibPath);
449 VDebugLog(
"\"%s\" is already loaded, so I duplicated it and changed its LC_ID_DYLIB to \"%s\".", dylibPath.c_str(), uniquedDylibPath.c_str());
451 if (shouldDeleteBinariesWhenFinished)
452 remove(dylibPath.c_str());
454 dylibPath = uniquedDylibPath;
455 shouldDeleteBinariesWhenFinished =
true;
460 throw VuoException(
"The composition couldn't start because the library '" + dylibPath +
"' couldn't be loaded : " + dlerror());
466 throw VuoException(
"The composition couldn't start because vuoInitInProcess() couldn't be found in '" + dylibPath +
"' : " + dlerror());
471 vuoInitInProcess(ZMQContext, ZMQControlURL.c_str(), ZMQTelemetryURL.c_str(),
true, getpid(), -1,
false,
488 string executableName;
489 if (isUsingCompositionLoader())
493 string dir, file, ext;
495 executableName = file;
499 string dir, file, ext;
501 string executableName = file;
503 executableName +=
"." + ext;
505 args.push_back(executableName);
517 args.push_back(
"--vuo-control=" + ZMQControlURL);
518 args.push_back(
"--vuo-telemetry=" + ZMQTelemetryURL);
523 args.push_back(
"--vuo-runner-pid=" + oss.str());
529 args.push_back(
"--vuo-runner-pipe=" + oss.str());
532 if (shouldContinueIfRunnerDies)
533 args.push_back(
"--vuo-continue-if-runner-dies");
535 if (isUsingCompositionLoader())
538 args.push_back(
"--vuo-loader=" + ZMQLoaderControlURL);
541 args.push_back(
"--vuo-pause");
546 throw VuoException(
"The composition couldn't start because a pipe couldn't be opened : " +
string(strerror(errno)));
548 int argSize = args.size();
549 char *argv[argSize + 1];
550 for (
size_t i = 0; i < argSize; ++i)
552 size_t mallocSize = args[i].length() + 1;
553 argv[i] = (
char *)malloc(mallocSize);
554 strlcpy(argv[i], args[i].c_str(), mallocSize);
556 argv[argSize] = NULL;
558 string errorWorkingDirectory =
"The composition couldn't start because the working directory couldn't be changed to '" + sourceDir +
"' : ";
559 string errorExecutable =
"The composition couldn't start because the file '" + executablePath +
"' couldn't be executed : ";
560 string errorFork =
"The composition couldn't start because the composition process couldn't be forked : ";
561 const size_t ERROR_BUFFER_LEN = 256;
562 char errorBuffer[ERROR_BUFFER_LEN];
564 pipe(runnerReadCompositionWritePipe);
566 pid_t childPid = fork();
573 close(runnerReadCompositionWritePipe[0]);
575 pid_t grandchildPid = fork();
576 if (grandchildPid == 0)
583 if (!sourceDir.empty())
585 ret = chdir(sourceDir.c_str());
588 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
589 write(STDERR_FILENO, errorWorkingDirectory.c_str(), errorWorkingDirectory.length());
590 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
591 write(STDERR_FILENO,
"\n", 1);
596 if (isRuntimeCheckingEnabled)
599 ret = execv(executablePath.c_str(), argv);
602 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
603 write(STDERR_FILENO, errorExecutable.c_str(), errorExecutable.length());
604 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
605 write(STDERR_FILENO,
"\n", 1);
606 for (
size_t i = 0; i < argSize; ++i)
611 else if (grandchildPid > 0)
615 int ret = write(fd[1], &grandchildPid,
sizeof(pid_t));
616 if (ret !=
sizeof(pid_t))
618 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
619 write(STDERR_FILENO, errorExecutable.c_str(), errorExecutable.length());
620 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
621 write(STDERR_FILENO,
"\n", 1);
632 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
633 write(STDERR_FILENO, errorFork.c_str(), errorFork.length());
634 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
635 write(STDERR_FILENO,
"\n", 1);
639 else if (childPid > 0)
648 for (
size_t i = 0; i < argSize; ++i)
651 pid_t grandchildPid = 0;
652 int ret = read(fd[0], &grandchildPid,
sizeof(pid_t));
653 if (ret !=
sizeof(pid_t))
654 throw VuoException(
"The composition couldn't start because the composition process id couldn't be obtained: " +
string(strerror(errno)));
660 ret = waitpid(childPid, &status, 0);
661 }
while (ret == -1 && errno == EINTR);
662 if (WIFEXITED(status) && WEXITSTATUS(status))
663 throw VuoException(
"The composition couldn't start because the parent of the composition process exited with an error.");
664 else if (WIFSIGNALED(status))
665 throw VuoException(
"The composition couldn't start because the parent of the composition process exited abnormally : " +
string(strsignal(WTERMSIG(status))));
667 if (grandchildPid > 0)
668 compositionPid = grandchildPid;
670 throw VuoException(
"The composition couldn't start because the composition process id couldn't be obtained");
674 for (
size_t i = 0; i < argSize; ++i)
677 throw VuoException(
"The composition couldn't start because the parent of the composition process couldn't be forked : " +
string(strerror(errno)));
682 if (isUsingCompositionLoader())
691 if (++numTries == 1000)
692 throw VuoException(
"The composition couldn't start because the runner couldn't establish communication with the composition loader : " +
string(strerror(errno)));
693 usleep(USEC_PER_SEC / 1000);
700 __block
string errorMessage;
701 dispatch_sync(controlQueue, ^{
705 errorMessage = e.
what();
708 if (! errorMessage.empty())
719 pthread_detach(pthread_self());
730 void VuoRunner::setUpConnections(
void)
737 while (zmq_connect(
ZMQControl,ZMQControlURL.c_str()))
739 if (++numTries == 1000)
740 throw VuoException(
"The composition couldn't start because the runner couldn't establish communication to control the composition : " +
string(strerror(errno)));
741 usleep(USEC_PER_SEC / 1000);
745 arePublishedInputPortsCached =
false;
746 arePublishedOutputPortsCached =
false;
747 if (isInCurrentProcess())
749 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
750 __block
string publishedPortsError;
751 dispatch_async(queue, ^{
753 getCachedPublishedPorts(
false);
754 getCachedPublishedPorts(
true);
756 publishedPortsError = e.
what();
759 while (! (arePublishedInputPortsCached && arePublishedOutputPortsCached) )
762 usleep(USEC_PER_SEC / 1000);
764 if (! publishedPortsError.empty())
770 getCachedPublishedPorts(
false);
771 getCachedPublishedPorts(
true);
774 listenCanceled =
false;
775 dispatch_semaphore_wait(endedListeningSemaphore, DISPATCH_TIME_FOREVER);
777 pthread_t listenThread;
780 throw VuoException(
string(
"The composition couldn't start because the runner couldn't create a thread: ") + strerror(ret));
782 dispatch_semaphore_wait(beganListeningSemaphore, DISPATCH_TIME_FOREVER);
783 if (!listenError.empty())
784 throw VuoException(
"The composition couldn't start because the runner couldn't establish communication to listen to the composition: " + listenError);
806 if (! isInCurrentProcess())
807 throw VuoException(
"The composition is not running in the current process. Only use this function if the composition was constructed with newCurrentProcessRunnerFromDynamicLibrary().");
810 throw VuoException(
"This is not the main thread. Only call this function from the main thread.");
842 if (! isInCurrentProcess())
843 throw VuoException(
"The composition is not running in the current process. Only use this function if the composition was constructed with newCurrentProcessRunnerFromDynamicLibrary().");
846 throw VuoException(
"This is not the main thread. Only call this function from the main thread.");
860 dispatch_sync(controlQueue, ^{
861 if (stopped || lostContact) {
874 stopBecauseLostContact(e.
what());
888 dispatch_sync(controlQueue, ^{
889 if (stopped || lostContact) {
902 stopBecauseLostContact(e.
what());
930 if (! isUsingCompositionLoader())
931 throw VuoException(
"The runner is not using a composition loader. Only use this function if the composition was constructed with newSeparateProcessRunnerFromDynamicLibrary().");
933 dispatch_sync(controlQueue, ^{
934 if (stopped || lostContact) {
940 if (dylibPath != compositionDylibPath)
942 if (shouldDeleteBinariesWhenFinished)
944 remove(dylibPath.c_str());
947 dylibPath = compositionDylibPath;
961 cleanUpConnections();
963 vector<string> dependencyDylibPathsRemoved = dependencyLibraries->dequeueLibrariesToUnload();
964 vector<string> dependencyDylibPathsAdded = dependencyLibraries->dequeueLibrariesToLoad();
966 unsigned int messageCount = 4 + dependencyDylibPathsAdded.size() + dependencyDylibPathsRemoved.size();
967 zmq_msg_t *messages = (zmq_msg_t *)malloc(messageCount *
sizeof(zmq_msg_t));
973 for (vector<string>::iterator i = dependencyDylibPathsAdded.begin(); i != dependencyDylibPathsAdded.end(); ++i) {
978 for (vector<string>::iterator i = dependencyDylibPathsRemoved.begin(); i != dependencyDylibPathsRemoved.end(); ++i) {
1003 stopBecauseLostContact(e.
what());
1023 dispatch_sync(controlQueue, ^{
1035 int timeoutInSeconds = (isInCurrentProcess() ? -1 : 5);
1036 zmq_msg_t messages[3];
1045 __block
bool replyReceived =
false;
1046 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
1056 replyReceived =
true;
1058 while (!replyReceived)
1061 usleep(USEC_PER_SEC / 1000);
1074 cleanUpConnections();
1090 VUserLog(
"The composition couldn't stop because vuoFini() couldn't be found in '%s' : %s", dylibPath.c_str(), dlerror());
1098 VUserLog(
"The composition couldn't stop because vuoFiniRuntimeState() couldn't be found in '%s' : %s", dylibPath.c_str(), dlerror());
1114 close(runnerReadCompositionWritePipe[1]);
1122 read(runnerReadCompositionWritePipe[0], &buf, 1);
1125 close(runnerReadCompositionWritePipe[0]);
1129 zmq_term(ZMQContext);
1134 dispatch_semaphore_wait(terminatedZMQContextSemaphore, DISPATCH_TIME_FOREVER);
1138 if (shouldDeleteBinariesWhenFinished)
1140 if (isUsingCompositionLoader())
1142 remove(dylibPath.c_str());
1144 else if (isInCurrentProcess())
1146 remove(dylibPath.c_str());
1150 remove(executablePath.c_str());
1154 dependencyLibraries =
nullptr;
1157 dispatch_semaphore_signal(stoppedSemaphore);
1165 void VuoRunner::cleanUpConnections(
void)
1175 vuoSend(
"VuoRunner::ZMQSelfSend", ZMQSelfSend, 0,
nullptr, 0,
false,
nullptr);
1177 dispatch_semaphore_wait(endedListeningSemaphore, DISPATCH_TIME_FOREVER);
1178 dispatch_semaphore_signal(endedListeningSemaphore);
1188 dispatch_retain(stoppedSemaphore);
1189 dispatch_semaphore_wait(stoppedSemaphore, DISPATCH_TIME_FOREVER);
1190 dispatch_semaphore_signal(stoppedSemaphore);
1191 dispatch_release(stoppedSemaphore);
1211 const char *valueAsString = json_object_to_json_string_ext(value, JSON_C_TO_STRING_PLAIN);
1213 dispatch_sync(controlQueue, ^{
1214 if (stopped || lostContact) {
1222 zmq_msg_t messages[3];
1231 stopBecauseLostContact(e.
what());
1251 dispatch_sync(controlQueue, ^{
1252 if (stopped || lostContact) {
1260 zmq_msg_t messages[2];
1268 stopBecauseLostContact(e.
what());
1288 __block
string valueAsString;
1289 dispatch_sync(controlQueue, ^{
1290 if (stopped || lostContact) {
1298 zmq_msg_t messages[3];
1304 valueAsString = receiveString(
"null");
1308 stopBecauseLostContact(e.
what());
1311 return json_tokener_parse(valueAsString.c_str());
1329 __block
string valueAsString;
1330 dispatch_sync(controlQueue, ^{
1331 if (stopped || lostContact) {
1339 zmq_msg_t messages[3];
1345 valueAsString = receiveString(
"null");
1349 stopBecauseLostContact(e.
what());
1352 return json_tokener_parse(valueAsString.c_str());
1370 __block
string summary;
1371 dispatch_sync(controlQueue, ^{
1372 if (stopped || lostContact) {
1380 zmq_msg_t messages[2];
1385 summary = receiveString(
"");
1389 stopBecauseLostContact(e.
what());
1410 __block
string summary;
1411 dispatch_sync(controlQueue, ^{
1412 if (stopped || lostContact) {
1420 zmq_msg_t messages[2];
1425 summary = receiveString(
"");
1429 stopBecauseLostContact(e.
what());
1450 __block
string summary;
1451 dispatch_sync(controlQueue, ^{
1452 if (stopped || lostContact) {
1460 zmq_msg_t messages[2];
1465 summary = receiveString(
"");
1469 stopBecauseLostContact(e.
what());
1490 __block
string summary;
1491 dispatch_sync(controlQueue, ^{
1492 if (stopped || lostContact) {
1500 zmq_msg_t messages[2];
1505 summary = receiveString(
"");
1509 stopBecauseLostContact(e.
what());
1527 dispatch_sync(controlQueue, ^{
1528 if (stopped || lostContact) {
1536 zmq_msg_t messages[2];
1544 stopBecauseLostContact(e.
what());
1561 dispatch_sync(controlQueue, ^{
1562 if (stopped || lostContact) {
1570 zmq_msg_t messages[2];
1578 stopBecauseLostContact(e.
what());
1595 dispatch_sync(controlQueue, ^{
1596 if (stopped || lostContact) {
1604 zmq_msg_t messages[1];
1611 stopBecauseLostContact(e.
what());
1630 dispatch_sync(controlQueue, ^{
1631 if (stopped || lostContact) {
1639 zmq_msg_t messages[1];
1646 stopBecauseLostContact(e.
what());
1663 dispatch_sync(controlQueue, ^{
1664 if (stopped || lostContact) {
1672 zmq_msg_t messages[1];
1679 stopBecauseLostContact(e.
what());
1698 dispatch_sync(controlQueue, ^{
1699 if (stopped || lostContact) {
1707 zmq_msg_t messages[1];
1714 stopBecauseLostContact(e.
what());
1735 for (
auto i : portsAndValuesToSet)
1737 string portName = i.first->getName();
1738 if (portName ==
"width")
1739 p->
lastWidth = json_object_get_int64(i.second);
1740 else if (portName ==
"height")
1741 p->
lastHeight = json_object_get_int64(i.second);
1742 else if (portName ==
"image" || portName ==
"startImage")
1745 if (json_object_object_get_ex(i.second,
"pixelsWide", &o))
1746 p->
lastWidth = json_object_get_int64(o);
1747 if (json_object_object_get_ex(i.second,
"pixelsHigh", &o))
1752 dispatch_sync(controlQueue, ^{
1753 if (stopped || lostContact) {
1761 int messageCount = portsAndValuesToSet.size() * 2;
1762 zmq_msg_t messages[messageCount];
1765 for (
auto &kv : portsAndValuesToSet)
1776 stopBecauseLostContact(e.
what());
1790 set<VuoRunner::Port *> portAsSet;
1791 portAsSet.insert(port);
1807 dispatch_sync(controlQueue, ^{
1808 if (stopped || lostContact) {
1814 lastFiredEventSignaled =
false;
1818 size_t messageCount = ports.size() + 1;
1819 zmq_msg_t messages[messageCount];
1832 stopBecauseLostContact(e.
what());
1863 saturating_semaphore_wait(lastFiredEventSemaphore, &lastFiredEventSignaled);
1878 __block
string valueAsString;
1879 dispatch_sync(controlQueue, ^{
1880 if (stopped || lostContact) {
1888 zmq_msg_t messages[2];
1893 valueAsString = receiveString(
"null");
1897 stopBecauseLostContact(e.
what());
1900 return json_tokener_parse(valueAsString.c_str());
1915 __block
string valueAsString;
1916 dispatch_sync(controlQueue, ^{
1917 if (stopped || lostContact) {
1925 zmq_msg_t messages[2];
1930 valueAsString = receiveString(
"null");
1934 stopBecauseLostContact(e.
what());
1939 json_object *js = json_tokener_parse(valueAsString.c_str());
1943 uint64_t actualWidth = 0;
1944 if (json_object_object_get_ex(js,
"pixelsWide", &o))
1945 actualWidth = json_object_get_int64(o);
1946 uint64_t actualHeight = 0;
1947 if (json_object_object_get_ex(js,
"pixelsHigh", &o))
1948 actualHeight = json_object_get_int64(o);
1954 p->vuoImageMakeFromJsonWithDimensions = (Private::vuoImageMakeFromJsonWithDimensionsType)dlsym(RTLD_DEFAULT,
"VuoImage_makeFromJsonWithDimensions");
1955 if (!p->vuoImageMakeFromJsonWithDimensions)
1957 VUserLog(
"Error: Couldn't find VuoImage_makeFromJsonWithDimensions.");
1964 VUserLog(
"Error: Couldn't find VuoImage_getJson.");
1994 vector<VuoRunner::Port *> VuoRunner::getCachedPublishedPorts(
bool input)
2002 if (! arePublishedInputPortsCached)
2004 publishedInputPorts = refreshPublishedPorts(
true);
2005 arePublishedInputPortsCached =
true;
2007 return publishedInputPorts;
2011 if (! arePublishedOutputPortsCached)
2013 publishedOutputPorts = refreshPublishedPorts(
false);
2014 arePublishedOutputPortsCached =
true;
2016 return publishedOutputPorts;
2031 vector<VuoRunner::Port *> VuoRunner::refreshPublishedPorts(
bool input)
2033 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
2034 dispatch_source_t timeout = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, queue);
2035 dispatch_source_set_timer(timeout, dispatch_time(DISPATCH_TIME_NOW, 5*NSEC_PER_SEC), NSEC_PER_SEC, NSEC_PER_SEC/10);
2036 dispatch_source_set_event_handler(timeout, ^{
2037 stopBecauseLostContact(
"The connection between the composition and runner timed out when trying to receive the list of published ports");
2038 dispatch_source_cancel(timeout);
2040 dispatch_resume(timeout);
2042 vector<VuoRunner::Port *> ports;
2069 vector<string> names;
2070 vector<string> types;
2071 vector<string> details;
2073 for (
int i = 0; i < 3; ++i)
2077 vector<string> messageStrings = receiveListOfStrings();
2079 names = messageStrings;
2081 types = messageStrings;
2083 details = messageStrings;
2086 for (
size_t i = 0; i < names.size() && i < types.size() && i < details.size(); ++i)
2088 VuoRunner::Port *port =
new Port(names[i], types[i], json_tokener_parse(details[i].c_str()));
2089 ports.push_back(port);
2094 dispatch_source_cancel(timeout);
2095 dispatch_release(timeout);
2099 dispatch_source_cancel(timeout);
2100 dispatch_release(timeout);
2116 return getCachedPublishedPorts(
true);
2130 return getCachedPublishedPorts(
false);
2145 for (vector<VuoRunner::Port *>::iterator i = inputPorts.begin(); i != inputPorts.end(); ++i)
2146 if ((*i)->getName() == name)
2164 for (vector<VuoRunner::Port *>::iterator i = outputPorts.begin(); i != outputPorts.end(); ++i)
2165 if ((*i)->getName() == name)
2182 void VuoRunner::listen()
2186 const char *compositionName = dylibPath.empty() ? executablePath.c_str() : dylibPath.c_str();
2189 if (
const char *lastSlash = strrchr(compositionName,
'/'))
2190 compositionName = lastSlash + 1;
2192 char threadName[MAXTHREADNAMESIZE];
2193 snprintf(threadName, MAXTHREADNAMESIZE,
"org.vuo.runner.telemetry: %s", compositionName);
2194 pthread_setname_np(threadName);
2197 ZMQSelfReceive = zmq_socket(ZMQContext, ZMQ_PAIR);
2199 if (zmq_bind(ZMQSelfReceive,
"inproc://vuo-runner-self") != 0)
2201 listenError = strerror(errno);
2202 dispatch_semaphore_signal(beganListeningSemaphore);
2206 ZMQSelfSend = zmq_socket(ZMQContext, ZMQ_PAIR);
2208 if (zmq_connect(ZMQSelfSend,
"inproc://vuo-runner-self") != 0)
2210 listenError = strerror(errno);
2211 dispatch_semaphore_signal(beganListeningSemaphore);
2216 ZMQTelemetry = zmq_socket(ZMQContext,ZMQ_SUB);
2218 if(zmq_connect(ZMQTelemetry,ZMQTelemetryURL.c_str()))
2220 listenError = strerror(errno);
2221 dispatch_semaphore_signal(beganListeningSemaphore);
2225 const int highWaterMark = 0;
2226 if(zmq_setsockopt(ZMQTelemetry,ZMQ_RCVHWM,&highWaterMark,
sizeof(highWaterMark)))
2228 listenError = strerror(errno);
2229 dispatch_semaphore_signal(beganListeningSemaphore);
2237 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2239 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2241 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2243 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2245 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2247 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2249 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2251 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2253 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2255 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2268 zmq_pollitem_t items[]=
2270 {ZMQTelemetry,0,ZMQ_POLLIN,0},
2274 zmq_poll(items,itemCount,timeout);
2277 dispatch_semaphore_signal(beganListeningSemaphore);
2279 bool pendingCancel =
false;
2280 while(! listenCanceled)
2282 zmq_pollitem_t items[]=
2284 {ZMQTelemetry,0,ZMQ_POLLIN,0},
2285 {ZMQSelfReceive,0,ZMQ_POLLIN,0},
2290 long timeout = pendingCancel ? 100 : 1000;
2291 zmq_poll(items,itemCount,timeout);
2292 if(items[0].revents & ZMQ_POLLIN)
2304 dispatch_sync(delegateQueue, ^{
2314 dispatch_sync(delegateQueue, ^{
2318 free(compositionIdentifier);
2319 free(nodeIdentifier);
2326 dispatch_sync(delegateQueue, ^{
2330 free(compositionIdentifier);
2331 free(nodeIdentifier);
2350 string portDataSummary;
2354 portDataSummary = s;
2358 portDataSummary =
"";
2360 dispatch_sync(delegateQueue, ^{
2367 free(portIdentifier);
2369 free(compositionIdentifier);
2389 string portDataSummary;
2393 portDataSummary = s;
2397 portDataSummary =
"";
2399 dispatch_sync(delegateQueue, ^{
2406 free(portIdentifier);
2408 free(compositionIdentifier);
2422 string portDataSummary;
2426 portDataSummary = s;
2430 portDataSummary =
"";
2434 dispatch_sync(delegateQueue, ^{
2440 free(portIdentifier);
2446 saturating_semaphore_signal(lastFiredEventSemaphore, &lastFiredEventSignaled);
2453 dispatch_sync(delegateQueue, ^{
2457 free(compositionIdentifier);
2458 free(portIdentifier);
2464 dispatch_sync(delegateQueue, ^{
2473 dispatch_sync(delegateQueue, ^{
2477 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2483 VUserLog(
"Error: Unknown telemetry message type: %d", type);
2487 else if (! listenCanceled)
2489 if (items[1].revents & ZMQ_POLLIN)
2495 pendingCancel =
true;
2498 else if (pendingCancel)
2499 listenCanceled =
true;
2507 VDebugLog(
"zmq_poll timed out, but system is sleeping so I'll try again.");
2509 VDebugLog(
"zmq_poll timed out, but a debugger is attached to the host so I'll try again.");
2512 listenCanceled =
true;
2513 string dir, file, ext;
2515 stopBecauseLostContact(
"The connection between the composition ('" + file +
"') and runner timed out while listening for telemetry.");
2521 zmq_close(ZMQTelemetry);
2522 ZMQTelemetry = NULL;
2524 zmq_close(ZMQSelfSend);
2526 zmq_close(ZMQSelfReceive);
2527 ZMQSelfReceive = NULL;
2529 dispatch_semaphore_signal(endedListeningSemaphore);
2542 void VuoRunner::vuoControlRequestSend(
enum VuoControlRequest request, zmq_msg_t *messages,
unsigned int messageCount)
2545 vuoSend(
"runner VuoControl",
ZMQControl,request,messages,messageCount,
false,&error);
2564 void VuoRunner::vuoLoaderControlRequestSend(
enum VuoLoaderControlRequest request, zmq_msg_t *messages,
unsigned int messageCount)
2585 void VuoRunner::vuoControlReplyReceive(
enum VuoControlReply expectedReply)
2595 oss << e <<
" (expected " << expectedReply <<
")";
2598 else if (reply != expectedReply)
2601 oss <<
"The runner received the wrong message from the composition (expected " << expectedReply <<
", received " << reply <<
")";
2624 oss << e <<
" (expected " << expectedReply <<
")";
2627 else if (reply != expectedReply)
2630 oss <<
"The runner received the wrong message from the composition loader (expected " << expectedReply <<
", received " << reply <<
")";
2640 string VuoRunner::receiveString(
string fallbackIfNull)
2659 ret = fallbackIfNull;
2667 vector<string> VuoRunner::receiveListOfStrings(
void)
2669 vector<string> messageStrings;
2672 string s = receiveString(
"");
2673 messageStrings.push_back(s);
2675 return messageStrings;
2683 void VuoRunner::saturating_semaphore_signal(dispatch_semaphore_t dsema,
bool *signaled)
2685 if (__sync_bool_compare_and_swap(signaled,
false,
true))
2686 dispatch_semaphore_signal(dsema);
2694 void VuoRunner::saturating_semaphore_wait(dispatch_semaphore_t dsema,
bool *signaled)
2697 dispatch_semaphore_wait(dsema, DISPATCH_TIME_FOREVER);
2711 bool VuoRunner::isInCurrentProcess(
void)
2713 return executablePath.empty();
2720 bool VuoRunner::isUsingCompositionLoader(
void)
2722 return ! executablePath.empty() && ! dylibPath.empty();
2730 dispatch_sync(delegateQueue, ^{
2731 this->delegate = delegate;
2738 void VuoRunner::stopBecauseLostContact(
string errorMessage)
2740 __block
bool alreadyLostContact;
2741 dispatch_sync(delegateQueue, ^{
2742 alreadyLostContact = lostContact;
2746 if (alreadyLostContact)
2749 saturating_semaphore_signal(lastFiredEventSemaphore, &lastFiredEventSignaled);
2751 dispatch_sync(delegateQueue, ^{
2756 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2760 if (! isInCurrentProcess())
2766 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2769 zmq_term(ZMQContext);
2771 dispatch_semaphore_signal(terminatedZMQContextSemaphore);
2775 VUserLog(
"%s", errorMessage.c_str());
2785 return compositionPid;
2799 this->details = details;
2850 VuoRunnerDelegate::~VuoRunnerDelegate() { }