19#include <CoreServices/CoreServices.h>
25#include <mach-o/dyld.h>
26#include <mach-o/loader.h>
27#include <mach-o/nlist.h>
28#include <sys/proc_info.h>
33#ifdef VUO_RUNNER_TRACE
37#define VuoRunnerTraceScope() VUserLog("{"); VuoDefer(^{ VuoLog(VuoLog_moduleName, __FILE__, 0, __func__, "}"); })
39#define VuoRunnerTraceScope()
44static const char *
mainThreadChecker =
"/Applications/Xcode.app/Contents/Developer/usr/lib/libMainThreadChecker.dylib";
54 int flags = fcntl(fd, F_GETFD);
57 VUserLog(
"Error: Couldn't get flags for desciptor %d: %s", fd, strerror(errno));
63 if (fcntl(fd, F_SETFD, flags) != 0)
64 VUserLog(
"Error: Couldn't set FD_CLOEXEC on descriptor %d: %s", fd, strerror(errno));
70static void __attribute__((constructor)) VuoRunner_init()
74#pragma clang diagnostic push
75#pragma clang diagnostic ignored "-Wdeprecated-declarations"
79#pragma clang diagnostic pop
106 zmq_setsockopt(zmqSocket, ZMQ_LINGER, &linger,
sizeof linger);
122 typedef void *(*vuoImageMakeFromJsonWithDimensionsType)(
json_object *,
unsigned int,
unsigned int);
147 vr->executablePath = executablePath;
149 vr->shouldDeleteBinariesWhenFinished = deleteExecutableWhenFinished;
150 vr->sourceDir = sourceDir;
170 const std::shared_ptr<VuoRunningCompositionLibraries> &runningCompositionLibraries,
176 vr->executablePath = compositionLoaderPath;
177 vr->dylibPath = compositionDylibPath;
178 vr->dependencyLibraries = runningCompositionLibraries;
179 vr->sourceDir = sourceDir;
181 vr->shouldDeleteBinariesWhenFinished = deleteDylibsWhenFinished;
182 runningCompositionLibraries->setDeleteResourceLibraries(deleteDylibsWhenFinished);
196 bool deleteDylibWhenFinished)
201 vr->dylibPath = dylibPath;
202 vr->shouldDeleteBinariesWhenFinished = deleteDylibWhenFinished;
203 vr->sourceDir = sourceDir;
216 dispatch_release(stoppedSemaphore);
217 dispatch_release(terminatedZMQContextSemaphore);
218 dispatch_release(beganListeningSemaphore);
219 dispatch_release(endedListeningSemaphore);
220 dispatch_release(lastFiredEventSemaphore);
221 dispatch_release(delegateQueue);
235 VUserLog(
"Error: Only call VuoRunner::setRuntimeChecking() prior to starting the composition.");
245VuoRunner::VuoRunner(
void)
249 dependencyLibraries = NULL;
250 shouldContinueIfRunnerDies =
false;
251 shouldDeleteBinariesWhenFinished =
false;
252 isRuntimeCheckingEnabled =
false;
256 listenCanceled =
false;
257 stoppedSemaphore = dispatch_semaphore_create(1);
258 terminatedZMQContextSemaphore = dispatch_semaphore_create(0);
259 beganListeningSemaphore = dispatch_semaphore_create(0);
260 endedListeningSemaphore = dispatch_semaphore_create(1);
261 lastFiredEventSemaphore = dispatch_semaphore_create(0);
262 lastFiredEventSignaled =
false;
263 controlQueue = dispatch_queue_create(
"org.vuo.runner.control", NULL);
266 ZMQSelfReceive = NULL;
269 ZMQLoaderControl = NULL;
271 delegateQueue = dispatch_queue_create(
"org.vuo.runner.delegate", NULL);
272 arePublishedInputPortsCached =
false;
273 arePublishedOutputPortsCached =
false;
275 static once_flag sleepHandlersInstalled;
276 call_once(sleepHandlersInstalled, [](){
304 if (isInCurrentProcess())
306 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
307 dispatch_async(queue, ^{
313 usleep(USEC_PER_SEC / 1000);
323 stopBecauseLostContact(e.
what());
355 stopBecauseLostContact(e.
what());
363void VuoRunner::copyDylibAndChangeId(
string dylibPath,
string &outputDylibPath)
365 string directory, file, extension;
368 const int makeTmpFileExtension = 7;
369 if (file.length() > makeTmpFileExtension)
372 string trimmedFile = file.substr(0, file.length() - makeTmpFileExtension);
378 alreadyLoaded = dlopen(outputDylibPath.c_str(), RTLD_NOLOAD);
379 }
while (alreadyLoaded);
389 outputDylibPath =
"/tmp/" + hash +
".dylib";
390 alreadyLoaded = dlopen(outputDylibPath.c_str(), RTLD_NOLOAD);
391 }
while (alreadyLoaded);
394 string newDirectory, newFile, newExtension;
397 if (newFile.length() > file.length())
398 throw VuoException(
"The composition couldn't start because the uniqued dylib name (" + newFile +
") is longer than the original dylib name (" + file +
").");
400 if (copyfile(dylibPath.c_str(), outputDylibPath.c_str(), NULL, COPYFILE_ALL))
401 throw VuoException(
"The composition couldn't start because a copy of the dylib couldn't be made.");
403 FILE *fp = fopen(outputDylibPath.c_str(),
"r+b");
405 throw VuoException(
"The composition couldn't start because the dylib's header couldn't be opened.");
407 __block
bool fpClosed =
false;
408 VuoDefer(^{
if (! fpClosed) fclose(fp); });
410 struct mach_header_64 header;
411 if (fread(&header,
sizeof(header), 1, fp) != 1)
412 throw VuoException(
"The composition couldn't start because the dylib's header couldn't be read.");
414 if (header.magic != MH_MAGIC_64)
415 throw VuoException(
"The composition couldn't start because the dylib isn't a 64-bit Mach-O binary.");
417 for (
int i = 0; i < header.ncmds; ++i)
419 struct load_command lc;
420 if (fread(&lc,
sizeof(lc), 1, fp) != 1)
421 throw VuoException(
"The composition couldn't start because the dylib's command couldn't be read.");
424 if (lc.cmd == LC_ID_DYLIB)
426 fseek(fp,
sizeof(
struct dylib), SEEK_CUR);
428 size_t nameLength = lc.cmdsize -
sizeof(
struct dylib_command);
429 char *name = (
char *)calloc(nameLength + 1, 1);
430 if (fread(name, nameLength, 1, fp) != 1)
431 throw VuoException(
"The composition couldn't start because the dylib's ID command couldn't be read.");
434 fseek(fp, -nameLength, SEEK_CUR);
435 bzero(name, nameLength);
436 memcpy(name, outputDylibPath.c_str(), min(nameLength, outputDylibPath.length()));
437 fwrite(name, nameLength, 1, fp);
446 vector<string> environment;
449 environment = {
"CODESIGN_ALLOCATE=" + codesignAllocatePath };
455 catch (std::exception &e)
457 VUserLog(
"Warning: Couldn't code-sign the renamed dylib: %s", e.what());
463 fseek(fp, lc.cmdsize-
sizeof(lc), SEEK_CUR);
466 throw VuoException(
"The composition couldn't start because the dylib's LC_ID_DYLIB command couldn't be found.");
474 if (header->magic != MH_MAGIC_64)
477 struct load_command *lc = (
struct load_command *)((
char *)header +
sizeof(
struct mach_header_64));
478 int64_t maxExtent = 0;
479 for (
int i = 0; i < header->ncmds; ++i)
481 if (lc->cmd == LC_SEGMENT_64)
483 struct segment_command_64 *seg = (
struct segment_command_64 *)lc;
484 maxExtent =
MAX(maxExtent, seg->vmaddr + seg->vmsize);
486 else if (lc->cmd == LC_CODE_SIGNATURE
487 || lc->cmd == LC_SEGMENT_SPLIT_INFO
488 || lc->cmd == LC_FUNCTION_STARTS
489 || lc->cmd == LC_DATA_IN_CODE
490 || lc->cmd == LC_DYLIB_CODE_SIGN_DRS
491 || lc->cmd == LC_LINKER_OPTIMIZATION_HINT
492 || lc->cmd == LC_DYLD_EXPORTS_TRIE
493 || lc->cmd == LC_DYLD_CHAINED_FIXUPS)
495 struct linkedit_data_command *data = (
struct linkedit_data_command *)lc;
496 maxExtent =
MAX(maxExtent, data->dataoff + data->datasize);
498 else if (lc->cmd == LC_SYMTAB)
500 struct symtab_command *symtab = (
struct symtab_command *)lc;
501 maxExtent =
MAX(maxExtent, symtab->symoff + symtab->nsyms *
sizeof(
struct nlist_64));
502 maxExtent =
MAX(maxExtent, symtab->stroff + symtab->strsize);
506 lc = (
struct load_command *)((
char *)lc + lc->cmdsize);
522 if (mh->flags & MH_DYLIB_IN_CACHE)
525 const char *homeZ = getenv(
"HOME");
530 Dl_info info{
"",
nullptr,
"",
nullptr};
531 dladdr((
void *)vmaddr_slide, &info);
532 string filename{info.dli_fname};
538 VuoLog(
VuoLog_moduleName, __FILE__, __LINE__, func,
"%16lx - %16lx (%lld bytes) %s", vmaddr_slide, (intptr_t)((
char *)vmaddr_slide + size), size, filename.c_str());
566void VuoRunner::startInternal(
void)
569 dispatch_semaphore_wait(stoppedSemaphore, DISPATCH_TIME_FOREVER);
571 ZMQContext = zmq_init(1);
573 if (isInCurrentProcess())
577 static once_flag dylibLoggerInitialized;
578 call_once(dylibLoggerInitialized, [](){
586 bool alreadyLoaded = dlopen(dylibPath.c_str(), RTLD_NOLOAD);
592 string uniquedDylibPath;
593 copyDylibAndChangeId(dylibPath, uniquedDylibPath);
594 VDebugLog(
"\"%s\" is already loaded, so I duplicated it and changed its LC_ID_DYLIB to \"%s\".", dylibPath.c_str(), uniquedDylibPath.c_str());
596 if (shouldDeleteBinariesWhenFinished)
597 remove(dylibPath.c_str());
599 dylibPath = uniquedDylibPath;
600 shouldDeleteBinariesWhenFinished =
true;
605 throw VuoException(
"The composition couldn't start because the library '" + dylibPath +
"' couldn't be loaded : " + dlerror());
611 throw VuoException(
"The composition couldn't start because vuoInitInProcess() couldn't be found in '" + dylibPath +
"' : " + dlerror());
616 vuoInitInProcess(ZMQContext, ZMQControlURL.c_str(), ZMQTelemetryURL.c_str(),
true, getpid(), -1,
false,
633 string executableName;
634 if (isUsingCompositionLoader())
638 string dir, file, ext;
640 executableName = file;
644 string dir, file, ext;
646 string executableName = file;
648 executableName +=
"." + ext;
650 args.push_back(executableName);
662 args.push_back(
"--vuo-control=" + ZMQControlURL);
663 args.push_back(
"--vuo-telemetry=" + ZMQTelemetryURL);
668 args.push_back(
"--vuo-runner-pid=" + oss.str());
674 args.push_back(
"--vuo-runner-pipe=" + oss.str());
677 if (shouldContinueIfRunnerDies)
678 args.push_back(
"--vuo-continue-if-runner-dies");
680 if (isUsingCompositionLoader())
683 args.push_back(
"--vuo-loader=" + ZMQLoaderControlURL);
686 args.push_back(
"--vuo-pause");
691 throw VuoException(
"The composition couldn't start because a pipe couldn't be opened : " +
string(strerror(errno)));
693 int argSize = args.size();
694 char *argv[argSize + 1];
695 for (
size_t i = 0; i < argSize; ++i)
697 size_t mallocSize = args[i].length() + 1;
698 argv[i] = (
char *)malloc(mallocSize);
699 strlcpy(argv[i], args[i].c_str(), mallocSize);
701 argv[argSize] = NULL;
703 string errorWorkingDirectory =
"The composition couldn't start because the working directory couldn't be changed to '" + sourceDir +
"' : ";
704 string errorExecutable =
"The composition couldn't start because the file '" + executablePath +
"' couldn't be executed : ";
705 string errorFork =
"The composition couldn't start because the composition process couldn't be forked : ";
706 const size_t ERROR_BUFFER_LEN = 256;
707 char errorBuffer[ERROR_BUFFER_LEN];
709 pipe(runnerReadCompositionWritePipe);
711 pid_t childPid = fork();
718 close(runnerReadCompositionWritePipe[0]);
720 pid_t grandchildPid = fork();
721 if (grandchildPid == 0)
728 if (!sourceDir.empty())
730 ret = chdir(sourceDir.c_str());
733 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
734 write(STDERR_FILENO, errorWorkingDirectory.c_str(), errorWorkingDirectory.length());
735 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
736 write(STDERR_FILENO,
"\n", 1);
741 if (isRuntimeCheckingEnabled)
743 const char *vuoRuntimeChecking = getenv(
"VUO_RUNTIME_CHECKING");
744 if (vuoRuntimeChecking)
745 setenv(
"DYLD_INSERT_LIBRARIES", vuoRuntimeChecking, 1);
750 ret = execv(executablePath.c_str(), argv);
753 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
754 write(STDERR_FILENO, errorExecutable.c_str(), errorExecutable.length());
755 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
756 write(STDERR_FILENO,
"\n", 1);
757 for (
size_t i = 0; i < argSize; ++i)
762 else if (grandchildPid > 0)
766 int ret = write(fd[1], &grandchildPid,
sizeof(pid_t));
767 if (ret !=
sizeof(pid_t))
769 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
770 write(STDERR_FILENO, errorExecutable.c_str(), errorExecutable.length());
771 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
772 write(STDERR_FILENO,
"\n", 1);
783 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
784 write(STDERR_FILENO, errorFork.c_str(), errorFork.length());
785 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
786 write(STDERR_FILENO,
"\n", 1);
790 else if (childPid > 0)
799 for (
size_t i = 0; i < argSize; ++i)
802 pid_t grandchildPid = 0;
803 int ret = read(fd[0], &grandchildPid,
sizeof(pid_t));
804 if (ret !=
sizeof(pid_t))
805 throw VuoException(
"The composition couldn't start because the composition process id couldn't be obtained: " +
string(strerror(errno)));
811 ret = waitpid(childPid, &status, 0);
812 }
while (ret == -1 && errno == EINTR);
813 if (WIFEXITED(status) && WEXITSTATUS(status))
814 throw VuoException(
"The composition couldn't start because the parent of the composition process exited with an error.");
815 else if (WIFSIGNALED(status))
816 throw VuoException(
"The composition couldn't start because the parent of the composition process exited abnormally : " +
string(strsignal(WTERMSIG(status))));
818 if (grandchildPid > 0)
819 compositionPid = grandchildPid;
821 throw VuoException(
"The composition couldn't start because the composition process id couldn't be obtained");
825 for (
size_t i = 0; i < argSize; ++i)
828 throw VuoException(
"The composition couldn't start because the parent of the composition process couldn't be forked : " +
string(strerror(errno)));
833 if (isUsingCompositionLoader())
842 if (++numTries == 1000)
843 throw VuoException(
"The composition couldn't start because the runner couldn't establish communication with the composition loader : " +
string(strerror(errno)));
844 usleep(USEC_PER_SEC / 1000);
853 __block
string errorMessage;
854 dispatch_sync(controlQueue, ^{
858 errorMessage = e.
what();
861 if (! errorMessage.empty())
872 pthread_detach(pthread_self());
883void VuoRunner::setUpConnections(
void)
890 while (zmq_connect(
ZMQControl,ZMQControlURL.c_str()))
892 if (++numTries == 1000)
893 throw VuoException(
"The composition couldn't start because the runner couldn't establish communication to control the composition : " +
string(strerror(errno)));
894 usleep(USEC_PER_SEC / 1000);
898 arePublishedInputPortsCached =
false;
899 arePublishedOutputPortsCached =
false;
900 if (isInCurrentProcess())
902 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
903 __block
string publishedPortsError;
904 dispatch_async(queue, ^{
906 getCachedPublishedPorts(
false);
907 getCachedPublishedPorts(
true);
909 publishedPortsError = e.
what();
912 while (! (arePublishedInputPortsCached && arePublishedOutputPortsCached) )
915 usleep(USEC_PER_SEC / 1000);
917 if (! publishedPortsError.empty())
923 getCachedPublishedPorts(
false);
924 getCachedPublishedPorts(
true);
927 listenCanceled =
false;
928 dispatch_semaphore_wait(endedListeningSemaphore, DISPATCH_TIME_FOREVER);
930 pthread_t listenThread;
933 throw VuoException(
string(
"The composition couldn't start because the runner couldn't create a thread: ") + strerror(ret));
935 dispatch_semaphore_wait(beganListeningSemaphore, DISPATCH_TIME_FOREVER);
936 if (!listenError.empty())
937 throw VuoException(
"The composition couldn't start because the runner couldn't establish communication to listen to the composition: " + listenError);
961 if (! isInCurrentProcess())
962 throw VuoException(
"The composition is not running in the current process. Only use this function if the composition was constructed with newCurrentProcessRunnerFromDynamicLibrary().");
965 throw VuoException(
"This is not the main thread. Only call this function from the main thread.");
999 if (! isInCurrentProcess())
1000 throw VuoException(
"The composition is not running in the current process. Only use this function if the composition was constructed with newCurrentProcessRunnerFromDynamicLibrary().");
1003 throw VuoException(
"This is not the main thread. Only call this function from the main thread.");
1019 dispatch_sync(controlQueue, ^{
1020 if (stopped || lostContact) {
1033 stopBecauseLostContact(e.
what());
1049 dispatch_sync(controlQueue, ^{
1050 if (stopped || lostContact) {
1063 stopBecauseLostContact(e.
what());
1093 if (! isUsingCompositionLoader())
1094 throw VuoException(
"The runner is not using a composition loader. Only use this function if the composition was constructed with newSeparateProcessRunnerFromDynamicLibrary().");
1096 dispatch_sync(controlQueue, ^{
1097 if (stopped || lostContact) {
1103 if (dylibPath != compositionDylibPath)
1105 if (shouldDeleteBinariesWhenFinished)
1107 remove(dylibPath.c_str());
1110 dylibPath = compositionDylibPath;
1124 cleanUpConnections();
1126 vector<string> dependencyDylibPathsRemoved = dependencyLibraries->dequeueLibrariesToUnload();
1127 vector<string> dependencyDylibPathsAdded = dependencyLibraries->dequeueLibrariesToLoad();
1129 unsigned int messageCount = 4 + dependencyDylibPathsAdded.size() + dependencyDylibPathsRemoved.size();
1130 zmq_msg_t *messages = (zmq_msg_t *)malloc(messageCount *
sizeof(zmq_msg_t));
1136 for (vector<string>::iterator i = dependencyDylibPathsAdded.begin(); i != dependencyDylibPathsAdded.end(); ++i) {
1141 for (vector<string>::iterator i = dependencyDylibPathsRemoved.begin(); i != dependencyDylibPathsRemoved.end(); ++i) {
1148 VUserLog(
" Replacing composition…");
1166 stopBecauseLostContact(e.
what());
1188 dispatch_sync(controlQueue, ^{
1200 int timeoutInSeconds = (isInCurrentProcess() ? -1 : 5);
1201 zmq_msg_t messages[2];
1209 __block
bool replyReceived =
false;
1210 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
1220 replyReceived =
true;
1222 while (!replyReceived)
1225 usleep(USEC_PER_SEC / 1000);
1238 cleanUpConnections();
1254 VUserLog(
"The composition couldn't stop because vuoFini() couldn't be found in '%s' : %s", dylibPath.c_str(), dlerror());
1262 VUserLog(
"The composition couldn't stop because vuoFiniRuntimeState() couldn't be found in '%s' : %s", dylibPath.c_str(), dlerror());
1278 close(runnerReadCompositionWritePipe[1]);
1286 read(runnerReadCompositionWritePipe[0], &buf, 1);
1289 close(runnerReadCompositionWritePipe[0]);
1293 zmq_term(ZMQContext);
1298 dispatch_semaphore_wait(terminatedZMQContextSemaphore, DISPATCH_TIME_FOREVER);
1302 if (shouldDeleteBinariesWhenFinished)
1304 if (isUsingCompositionLoader())
1306 remove(dylibPath.c_str());
1308 else if (isInCurrentProcess())
1310 remove(dylibPath.c_str());
1314 remove(executablePath.c_str());
1318 dependencyLibraries =
nullptr;
1321 dispatch_semaphore_signal(stoppedSemaphore);
1329void VuoRunner::cleanUpConnections(
void)
1339 vuoSend(
"VuoRunner::ZMQSelfSend", ZMQSelfSend, 0,
nullptr, 0,
false,
nullptr);
1341 dispatch_semaphore_wait(endedListeningSemaphore, DISPATCH_TIME_FOREVER);
1342 dispatch_semaphore_signal(endedListeningSemaphore);
1354 dispatch_retain(stoppedSemaphore);
1355 dispatch_semaphore_wait(stoppedSemaphore, DISPATCH_TIME_FOREVER);
1356 dispatch_semaphore_signal(stoppedSemaphore);
1357 dispatch_release(stoppedSemaphore);
1379 const char *valueAsString = json_object_to_json_string_ext(value, JSON_C_TO_STRING_PLAIN);
1381 dispatch_sync(controlQueue, ^{
1382 if (stopped || lostContact) {
1390 zmq_msg_t messages[3];
1399 stopBecauseLostContact(e.
what());
1421 dispatch_sync(controlQueue, ^{
1422 if (stopped || lostContact) {
1430 zmq_msg_t messages[2];
1438 stopBecauseLostContact(e.
what());
1463 __block
string valueAsString;
1464 dispatch_sync(controlQueue, ^{
1465 if (stopped || lostContact) {
1473 zmq_msg_t messages[3];
1479 valueAsString = receiveString(
"null");
1483 stopBecauseLostContact(e.
what());
1486 return json_tokener_parse(valueAsString.c_str());
1509 __block
string valueAsString;
1510 dispatch_sync(controlQueue, ^{
1511 if (stopped || lostContact) {
1519 zmq_msg_t messages[3];
1525 valueAsString = receiveString(
"null");
1529 stopBecauseLostContact(e.
what());
1532 return json_tokener_parse(valueAsString.c_str());
1552 __block
string summary;
1553 dispatch_sync(controlQueue, ^{
1554 if (stopped || lostContact) {
1562 zmq_msg_t messages[2];
1567 summary = receiveString(
"");
1571 stopBecauseLostContact(e.
what());
1594 __block
string summary;
1595 dispatch_sync(controlQueue, ^{
1596 if (stopped || lostContact) {
1604 zmq_msg_t messages[2];
1609 summary = receiveString(
"");
1613 stopBecauseLostContact(e.
what());
1636 __block
string summary;
1637 dispatch_sync(controlQueue, ^{
1638 if (stopped || lostContact) {
1646 zmq_msg_t messages[2];
1651 summary = receiveString(
"");
1655 stopBecauseLostContact(e.
what());
1678 __block
string summary;
1679 dispatch_sync(controlQueue, ^{
1680 if (stopped || lostContact) {
1688 zmq_msg_t messages[2];
1693 summary = receiveString(
"");
1697 stopBecauseLostContact(e.
what());
1717 dispatch_sync(controlQueue, ^{
1718 if (stopped || lostContact) {
1726 zmq_msg_t messages[2];
1734 stopBecauseLostContact(e.
what());
1753 dispatch_sync(controlQueue, ^{
1754 if (stopped || lostContact) {
1762 zmq_msg_t messages[2];
1770 stopBecauseLostContact(e.
what());
1789 dispatch_sync(controlQueue, ^{
1790 if (stopped || lostContact) {
1798 zmq_msg_t messages[1];
1805 stopBecauseLostContact(e.
what());
1826 dispatch_sync(controlQueue, ^{
1827 if (stopped || lostContact) {
1835 zmq_msg_t messages[1];
1842 stopBecauseLostContact(e.
what());
1861 dispatch_sync(controlQueue, ^{
1862 if (stopped || lostContact) {
1870 zmq_msg_t messages[1];
1877 stopBecauseLostContact(e.
what());
1898 dispatch_sync(controlQueue, ^{
1899 if (stopped || lostContact) {
1907 zmq_msg_t messages[1];
1914 stopBecauseLostContact(e.
what());
1937 for (
auto i : portsAndValuesToSet)
1939 string portName = i.first->getName();
1940 if (portName ==
"width")
1941 p->
lastWidth = json_object_get_int64(i.second);
1942 else if (portName ==
"height")
1943 p->
lastHeight = json_object_get_int64(i.second);
1944 else if (portName ==
"image" || portName ==
"startImage")
1947 if (json_object_object_get_ex(i.second,
"pixelsWide", &o))
1948 p->
lastWidth = json_object_get_int64(o);
1949 if (json_object_object_get_ex(i.second,
"pixelsHigh", &o))
1951 if (json_object_object_get_ex(i.second,
"pointer", &o))
1960 dispatch_sync(controlQueue, ^{
1961 if (stopped || lostContact) {
1969 int messageCount = portsAndValuesToSet.size() * 2;
1970 zmq_msg_t messages[messageCount];
1973 for (
auto &kv : portsAndValuesToSet)
1984 stopBecauseLostContact(e.
what());
2000 set<VuoRunner::Port *> portAsSet;
2001 portAsSet.insert(port);
2019 dispatch_sync(controlQueue, ^{
2020 if (stopped || lostContact) {
2026 lastFiredEventSignaled =
false;
2030 size_t messageCount = ports.size() + 1;
2031 zmq_msg_t messages[messageCount];
2044 stopBecauseLostContact(e.
what());
2077 saturating_semaphore_wait(lastFiredEventSemaphore, &lastFiredEventSignaled);
2097 __block
string valueAsString;
2098 dispatch_sync(controlQueue, ^{
2099 if (stopped || lostContact) {
2107 zmq_msg_t messages[2];
2112 valueAsString = receiveString(
"null");
2116 stopBecauseLostContact(e.
what());
2119 return json_tokener_parse(valueAsString.c_str());
2139 __block
string valueAsString;
2140 dispatch_sync(controlQueue, ^{
2141 if (stopped || lostContact) {
2149 zmq_msg_t messages[2];
2154 valueAsString = receiveString(
"null");
2158 stopBecauseLostContact(e.
what());
2163 json_object *js = json_tokener_parse(valueAsString.c_str());
2167 uint64_t actualWidth = 0;
2168 if (json_object_object_get_ex(js,
"pixelsWide", &o))
2169 actualWidth = json_object_get_int64(o);
2170 uint64_t actualHeight = 0;
2171 if (json_object_object_get_ex(js,
"pixelsHigh", &o))
2172 actualHeight = json_object_get_int64(o);
2173 if (json_object_object_get_ex(js,
"pointer", &o))
2176 actualWidth = vi->pixelsWide;
2177 actualHeight = vi->pixelsHigh;
2184 p->vuoImageMakeFromJsonWithDimensions = (Private::vuoImageMakeFromJsonWithDimensionsType)dlsym(RTLD_SELF,
"VuoImage_makeFromJsonWithDimensions");
2185 if (!p->vuoImageMakeFromJsonWithDimensions)
2187 VUserLog(
"Error: Couldn't find VuoImage_makeFromJsonWithDimensions.");
2194 VUserLog(
"Error: Couldn't find VuoImage_getInterprocessJson.");
2203 json_object_put(js);
2226vector<VuoRunner::Port *> VuoRunner::getCachedPublishedPorts(
bool input)
2234 if (! arePublishedInputPortsCached)
2236 publishedInputPorts = refreshPublishedPorts(
true);
2237 arePublishedInputPortsCached =
true;
2239 return publishedInputPorts;
2243 if (! arePublishedOutputPortsCached)
2245 publishedOutputPorts = refreshPublishedPorts(
false);
2246 arePublishedOutputPortsCached =
true;
2248 return publishedOutputPorts;
2263vector<VuoRunner::Port *> VuoRunner::refreshPublishedPorts(
bool input)
2265 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
2266 dispatch_source_t timeout = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, queue);
2267 dispatch_source_set_timer(timeout, dispatch_time(DISPATCH_TIME_NOW, 5*NSEC_PER_SEC), NSEC_PER_SEC, NSEC_PER_SEC/10);
2268 dispatch_source_set_event_handler(timeout, ^{
2269 stopBecauseLostContact(
"The connection between the composition and runner timed out when trying to receive the list of published ports");
2270 dispatch_source_cancel(timeout);
2272 dispatch_resume(timeout);
2274 vector<VuoRunner::Port *> ports;
2301 vector<string> names;
2302 vector<string> types;
2303 vector<string> details;
2305 for (
int i = 0; i < 3; ++i)
2309 vector<string> messageStrings = receiveListOfStrings();
2311 names = messageStrings;
2313 types = messageStrings;
2315 details = messageStrings;
2318 for (
size_t i = 0; i < names.size() && i < types.size() && i < details.size(); ++i)
2320 VuoRunner::Port *port =
new Port(names[i], types[i], json_tokener_parse(details[i].c_str()));
2321 ports.push_back(port);
2326 dispatch_source_cancel(timeout);
2327 dispatch_release(timeout);
2331 dispatch_source_cancel(timeout);
2332 dispatch_release(timeout);
2350 return getCachedPublishedPorts(
true);
2366 return getCachedPublishedPorts(
false);
2383 for (vector<VuoRunner::Port *>::iterator i = inputPorts.begin(); i != inputPorts.end(); ++i)
2384 if ((*i)->getName() == name)
2404 for (vector<VuoRunner::Port *>::iterator i = outputPorts.begin(); i != outputPorts.end(); ++i)
2405 if ((*i)->getName() == name)
2422void VuoRunner::listen()
2426 const char *compositionName = dylibPath.empty() ? executablePath.c_str() : dylibPath.c_str();
2429 if (
const char *lastSlash = strrchr(compositionName,
'/'))
2430 compositionName = lastSlash + 1;
2432 char threadName[MAXTHREADNAMESIZE];
2433 snprintf(threadName, MAXTHREADNAMESIZE,
"org.vuo.runner.telemetry: %s", compositionName);
2434 pthread_setname_np(threadName);
2437 ZMQSelfReceive = zmq_socket(ZMQContext, ZMQ_PAIR);
2439 if (zmq_bind(ZMQSelfReceive,
"inproc://vuo-runner-self") != 0)
2441 listenError = strerror(errno);
2442 dispatch_semaphore_signal(beganListeningSemaphore);
2446 ZMQSelfSend = zmq_socket(ZMQContext, ZMQ_PAIR);
2448 if (zmq_connect(ZMQSelfSend,
"inproc://vuo-runner-self") != 0)
2450 listenError = strerror(errno);
2451 dispatch_semaphore_signal(beganListeningSemaphore);
2456 ZMQTelemetry = zmq_socket(ZMQContext,ZMQ_SUB);
2458 if(zmq_connect(ZMQTelemetry,ZMQTelemetryURL.c_str()))
2460 listenError = strerror(errno);
2461 dispatch_semaphore_signal(beganListeningSemaphore);
2465 const int highWaterMark = 0;
2466 if(zmq_setsockopt(ZMQTelemetry,ZMQ_RCVHWM,&highWaterMark,
sizeof(highWaterMark)))
2468 listenError = strerror(errno);
2469 dispatch_semaphore_signal(beganListeningSemaphore);
2477 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2479 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2481 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2483 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2485 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2487 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2489 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2491 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2493 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2495 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type,
sizeof type);
2508 zmq_pollitem_t items[]=
2510 {ZMQTelemetry,0,ZMQ_POLLIN,0},
2514 zmq_poll(items,itemCount,timeout);
2517 dispatch_semaphore_signal(beganListeningSemaphore);
2519 bool pendingCancel =
false;
2520 while(! listenCanceled)
2522 zmq_pollitem_t items[]=
2524 {ZMQTelemetry,0,ZMQ_POLLIN,0},
2525 {ZMQSelfReceive,0,ZMQ_POLLIN,0},
2530 long timeout = pendingCancel ? 100 : 1000;
2531 zmq_poll(items,itemCount,timeout);
2532 if(items[0].revents & ZMQ_POLLIN)
2542 dispatch_sync(delegateQueue, ^{
2552 dispatch_sync(delegateQueue, ^{
2556 free(compositionIdentifier);
2557 free(nodeIdentifier);
2564 dispatch_sync(delegateQueue, ^{
2568 free(compositionIdentifier);
2569 free(nodeIdentifier);
2588 string portDataSummary;
2592 portDataSummary = s;
2596 portDataSummary =
"";
2598 dispatch_sync(delegateQueue, ^{
2605 free(portIdentifier);
2607 free(compositionIdentifier);
2627 string portDataSummary;
2631 portDataSummary = s;
2635 portDataSummary =
"";
2637 dispatch_sync(delegateQueue, ^{
2644 free(portIdentifier);
2646 free(compositionIdentifier);
2660 string portDataSummary;
2664 portDataSummary = s;
2668 portDataSummary =
"";
2672 dispatch_sync(delegateQueue, ^{
2678 free(portIdentifier);
2684 saturating_semaphore_signal(lastFiredEventSemaphore, &lastFiredEventSignaled);
2691 dispatch_sync(delegateQueue, ^{
2695 free(compositionIdentifier);
2696 free(portIdentifier);
2702 dispatch_sync(delegateQueue, ^{
2711 dispatch_sync(delegateQueue, ^{
2715 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2721 VUserLog(
"Error: Unknown telemetry message type: %d", type);
2725 else if (! listenCanceled)
2727 if (items[1].revents & ZMQ_POLLIN)
2733 pendingCancel =
true;
2736 else if (pendingCancel)
2737 listenCanceled =
true;
2745 VDebugLog(
"zmq_poll timed out, but system is sleeping so I'll try again.");
2747 VDebugLog(
"zmq_poll timed out, but a debugger is attached to the host so I'll try again.");
2750 listenCanceled =
true;
2751 string dir, file, ext;
2753 stopBecauseLostContact(
"The connection between the composition ('" + file +
"') and runner timed out while listening for telemetry.");
2759 zmq_close(ZMQTelemetry);
2760 ZMQTelemetry = NULL;
2762 zmq_close(ZMQSelfSend);
2764 zmq_close(ZMQSelfReceive);
2765 ZMQSelfReceive = NULL;
2767 dispatch_semaphore_signal(endedListeningSemaphore);
2780void VuoRunner::vuoControlRequestSend(
enum VuoControlRequest request, zmq_msg_t *messages,
unsigned int messageCount)
2783 vuoSend(
"runner VuoControl",
ZMQControl,request,messages,messageCount,
false,&error);
2802void VuoRunner::vuoLoaderControlRequestSend(
enum VuoLoaderControlRequest request, zmq_msg_t *messages,
unsigned int messageCount)
2823void VuoRunner::vuoControlReplyReceive(
enum VuoControlReply expectedReply)
2833 oss << e <<
" (expected " << expectedReply <<
")";
2836 else if (reply != expectedReply)
2839 oss <<
"The runner received the wrong message from the composition (expected " << expectedReply <<
", received " << reply <<
")";
2862 oss << e <<
" (expected " << expectedReply <<
")";
2865 else if (reply != expectedReply)
2868 oss <<
"The runner received the wrong message from the composition loader (expected " << expectedReply <<
", received " << reply <<
")";
2878string VuoRunner::receiveString(
string fallbackIfNull)
2897 ret = fallbackIfNull;
2905vector<string> VuoRunner::receiveListOfStrings(
void)
2907 vector<string> messageStrings;
2910 string s = receiveString(
"");
2911 messageStrings.push_back(s);
2913 return messageStrings;
2921void VuoRunner::saturating_semaphore_signal(dispatch_semaphore_t dsema,
bool *signaled)
2923 if (__sync_bool_compare_and_swap(signaled,
false,
true))
2924 dispatch_semaphore_signal(dsema);
2932void VuoRunner::saturating_semaphore_wait(dispatch_semaphore_t dsema,
bool *signaled)
2935 dispatch_semaphore_wait(dsema, DISPATCH_TIME_FOREVER);
2951bool VuoRunner::isInCurrentProcess(
void)
2953 return executablePath.empty();
2960bool VuoRunner::isUsingCompositionLoader(
void)
2962 return ! executablePath.empty() && ! dylibPath.empty();
2972 dispatch_sync(delegateQueue, ^{
2973 this->delegate = delegate;
2980void VuoRunner::stopBecauseLostContact(
string errorMessage)
2982 __block
bool alreadyLostContact;
2983 dispatch_sync(delegateQueue, ^{
2984 alreadyLostContact = lostContact;
2988 if (alreadyLostContact)
2991 saturating_semaphore_signal(lastFiredEventSemaphore, &lastFiredEventSignaled);
2993 dispatch_sync(delegateQueue, ^{
2998 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
3002 if (! isInCurrentProcess())
3008 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
3011 zmq_term(ZMQContext);
3013 dispatch_semaphore_signal(terminatedZMQContextSemaphore);
3017 VUserLog(
"%s", errorMessage.c_str());
3029 return compositionPid;
3043 this->details = details;
3094VuoRunnerDelegate::~VuoRunnerDelegate() { }