Vuo  2.4.1
VuoRunner.cc
Go to the documentation of this file.
1
10#include "VuoRunner.hh"
11#include "VuoFileUtilities.hh"
12#include "VuoImage.h"
13#include "VuoStringUtilities.hh"
14#include "VuoEventLoop.h"
15#include "VuoException.hh"
16#include "VuoRuntime.h"
18
19#include <CoreServices/CoreServices.h>
20#include <pthread.h>
21#include <stdio.h>
22#include <dlfcn.h>
23#include <sstream>
24#include <copyfile.h>
25#include <mach-o/dyld.h>
26#include <mach-o/loader.h>
27#include <mach-o/nlist.h>
28#include <sys/proc_info.h>
29#include <sys/stat.h>
30
33#ifdef VUO_RUNNER_TRACE
34// Since the VuoDefer is declared on the first line of the method,
35// log line number 0 instead of the first line,
36// to avoid confusion about where the method is being exited.
37#define VuoRunnerTraceScope() VUserLog("{"); VuoDefer(^{ VuoLog(VuoLog_moduleName, __FILE__, 0, __func__, "}"); })
38#else
39#define VuoRunnerTraceScope()
40#endif
42
43void *VuoApp_mainThread = NULL;
44static const char *mainThreadChecker = "/Applications/Xcode.app/Contents/Developer/usr/lib/libMainThreadChecker.dylib";
46static bool VuoRunner_isHostVDMX = false;
47
52static void VuoRunner_closeOnExec(int fd)
53{
54 int flags = fcntl(fd, F_GETFD);
55 if (flags < 0)
56 {
57 VUserLog("Error: Couldn't get flags for desciptor %d: %s", fd, strerror(errno));
58 return;
59 }
60
61 flags |= FD_CLOEXEC;
62
63 if (fcntl(fd, F_SETFD, flags) != 0)
64 VUserLog("Error: Couldn't set FD_CLOEXEC on descriptor %d: %s", fd, strerror(errno));
65}
66
70static void __attribute__((constructor)) VuoRunner_init()
71{
72 VuoApp_mainThread = (void *)pthread_self();
73
74#pragma clang diagnostic push
75#pragma clang diagnostic ignored "-Wdeprecated-declarations"
76 // Calls _TSGetMainThread().
77 // https://b33p.net/kosada/node/12944
78 YieldToAnyThread();
79#pragma clang diagnostic pop
80
82
83 // Ensure that the write end of this pipe gets closed upon fork()/exec(),
84 // so child processes don't prop open this pipe,
85 // which would prevent Vuo compositions from quitting when the VuoRunner process quits.
87
88 if (VuoStringUtilities::makeFromCFString(CFBundleGetIdentifier(CFBundleGetMainBundle())) == "com.vidvox.VDMX5")
90}
91
95static bool isMainThread(void)
96{
97 return VuoApp_mainThread == (void *)pthread_self();
98}
99
103static void VuoRunner_configureSocket(void *zmqSocket)
104{
105 int linger = 0; // avoid having zmq_term block if the runner has tried to send a message on a broken connection
106 zmq_setsockopt(zmqSocket, ZMQ_LINGER, &linger, sizeof linger);
107}
108
113{
114public:
115 Private() :
116 lastWidth(0),
117 lastHeight(0)
118 {
119 }
120
122 typedef void *(*vuoImageMakeFromJsonWithDimensionsType)(json_object *, unsigned int, unsigned int);
124 typedef json_object *(*vuoImageGetInterprocessJsonType)(void *);
126
127 uint64_t lastWidth;
128 uint64_t lastHeight;
129};
130
141VuoRunner * VuoRunner::newSeparateProcessRunnerFromExecutable(string executablePath, string sourceDir,
142 bool continueIfRunnerDies, bool deleteExecutableWhenFinished)
143{
145
146 VuoRunner * vr = new VuoRunner();
147 vr->executablePath = executablePath;
148 vr->shouldContinueIfRunnerDies = continueIfRunnerDies;
149 vr->shouldDeleteBinariesWhenFinished = deleteExecutableWhenFinished;
150 vr->sourceDir = sourceDir;
151 return vr;
152}
153
169VuoRunner * VuoRunner::newSeparateProcessRunnerFromDynamicLibrary(string compositionLoaderPath, string compositionDylibPath,
170 const std::shared_ptr<VuoRunningCompositionLibraries> &runningCompositionLibraries,
171 string sourceDir, bool continueIfRunnerDies, bool deleteDylibsWhenFinished)
172{
174
175 VuoRunner * vr = new VuoRunner();
176 vr->executablePath = compositionLoaderPath;
177 vr->dylibPath = compositionDylibPath;
178 vr->dependencyLibraries = runningCompositionLibraries;
179 vr->sourceDir = sourceDir;
180 vr->shouldContinueIfRunnerDies = continueIfRunnerDies;
181 vr->shouldDeleteBinariesWhenFinished = deleteDylibsWhenFinished;
182 runningCompositionLibraries->setDeleteResourceLibraries(deleteDylibsWhenFinished);
183 return vr;
184}
185
196 bool deleteDylibWhenFinished)
197{
199
200 VuoRunner * vr = new VuoRunner();
201 vr->dylibPath = dylibPath;
202 vr->shouldDeleteBinariesWhenFinished = deleteDylibWhenFinished;
203 vr->sourceDir = sourceDir;
204 return vr;
205}
206
213{
215
216 dispatch_release(stoppedSemaphore);
217 dispatch_release(terminatedZMQContextSemaphore);
218 dispatch_release(beganListeningSemaphore);
219 dispatch_release(endedListeningSemaphore);
220 dispatch_release(lastFiredEventSemaphore);
221 dispatch_release(delegateQueue);
222 delete p;
223}
224
229void VuoRunner::setRuntimeChecking(bool runtimeCheckingEnabled)
230{
232
233 if (!stopped)
234 {
235 VUserLog("Error: Only call VuoRunner::setRuntimeChecking() prior to starting the composition.");
236 return;
237 }
238
239 isRuntimeCheckingEnabled = runtimeCheckingEnabled && VuoFileUtilities::fileExists(mainThreadChecker);
240}
241
245VuoRunner::VuoRunner(void)
246{
247 p = new Private;
248 dylibHandle = NULL;
249 dependencyLibraries = NULL;
250 shouldContinueIfRunnerDies = false;
251 shouldDeleteBinariesWhenFinished = false;
252 isRuntimeCheckingEnabled = false;
253 paused = true;
254 stopped = true;
255 lostContact = false;
256 listenCanceled = false;
257 stoppedSemaphore = dispatch_semaphore_create(1);
258 terminatedZMQContextSemaphore = dispatch_semaphore_create(0);
259 beganListeningSemaphore = dispatch_semaphore_create(0);
260 endedListeningSemaphore = dispatch_semaphore_create(1);
261 lastFiredEventSemaphore = dispatch_semaphore_create(0);
262 lastFiredEventSignaled = false;
263 controlQueue = dispatch_queue_create("org.vuo.runner.control", NULL);
264 ZMQContext = NULL;
265 ZMQSelfSend = NULL;
266 ZMQSelfReceive = NULL;
267 ZMQControl = NULL;
268 ZMQTelemetry = NULL;
269 ZMQLoaderControl = NULL;
270 delegate = NULL;
271 delegateQueue = dispatch_queue_create("org.vuo.runner.delegate", NULL);
272 arePublishedInputPortsCached = false;
273 arePublishedOutputPortsCached = false;
274
275 static once_flag sleepHandlersInstalled;
276 call_once(sleepHandlersInstalled, [](){
278 });
279}
280
297{
299
300 try
301 {
302 startInternal();
303
304 if (isInCurrentProcess())
305 {
306 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
307 dispatch_async(queue, ^{
308 unpause();
309 });
310 while (paused)
311 {
313 usleep(USEC_PER_SEC / 1000);
314 }
315 }
316 else
317 {
318 unpause();
319 }
320 }
321 catch (VuoException &e)
322 {
323 stopBecauseLostContact(e.what());
324 }
325}
326
346{
348
349 try
350 {
351 startInternal();
352 }
353 catch (VuoException &e)
354 {
355 stopBecauseLostContact(e.what());
356 }
357}
358
363void VuoRunner::copyDylibAndChangeId(string dylibPath, string &outputDylibPath)
364{
365 string directory, file, extension;
366 VuoFileUtilities::splitPath(dylibPath, directory, file, extension);
367
368 const int makeTmpFileExtension = 7;
369 if (file.length() > makeTmpFileExtension)
370 {
371 // makeTmpFile() appends "-XXXXXX"; make room for that.
372 string trimmedFile = file.substr(0, file.length() - makeTmpFileExtension);
373
374 bool alreadyLoaded;
375 do
376 {
377 outputDylibPath = VuoFileUtilities::makeTmpFile(trimmedFile, "dylib");
378 alreadyLoaded = dlopen(outputDylibPath.c_str(), RTLD_NOLOAD);
379 } while (alreadyLoaded);
380 }
381 else
382 {
383 // For short names, like those generated by VDMX, just replace the entire name with a hash.
384 // https://b33p.net/kosada/node/12917
385 bool alreadyLoaded;
386 do
387 {
388 string hash = VuoStringUtilities::makeRandomHash(file.length());
389 outputDylibPath = "/tmp/" + hash + ".dylib";
390 alreadyLoaded = dlopen(outputDylibPath.c_str(), RTLD_NOLOAD);
391 } while (alreadyLoaded);
392 }
393
394 string newDirectory, newFile, newExtension;
395 VuoFileUtilities::splitPath(outputDylibPath, newDirectory, newFile, newExtension);
396
397 if (newFile.length() > file.length())
398 throw VuoException("The composition couldn't start because the uniqued dylib name (" + newFile + ") is longer than the original dylib name (" + file + ").");
399
400 if (copyfile(dylibPath.c_str(), outputDylibPath.c_str(), NULL, COPYFILE_ALL))
401 throw VuoException("The composition couldn't start because a copy of the dylib couldn't be made.");
402
403 FILE *fp = fopen(outputDylibPath.c_str(), "r+b");
404 if (!fp)
405 throw VuoException("The composition couldn't start because the dylib's header couldn't be opened.");
406
407 __block bool fpClosed = false;
408 VuoDefer(^{ if (! fpClosed) fclose(fp); });
409
410 struct mach_header_64 header;
411 if (fread(&header, sizeof(header), 1, fp) != 1)
412 throw VuoException("The composition couldn't start because the dylib's header couldn't be read.");
413
414 if (header.magic != MH_MAGIC_64)
415 throw VuoException("The composition couldn't start because the dylib isn't a 64-bit Mach-O binary.");
416
417 for (int i = 0; i < header.ncmds; ++i)
418 {
419 struct load_command lc;
420 if (fread(&lc, sizeof(lc), 1, fp) != 1)
421 throw VuoException("The composition couldn't start because the dylib's command couldn't be read.");
422
423 // VLog("cmd[%d]: %x (size %d)",i,lc.cmd,lc.cmdsize);
424 if (lc.cmd == LC_ID_DYLIB)
425 {
426 fseek(fp, sizeof(struct dylib), SEEK_CUR);
427
428 size_t nameLength = lc.cmdsize - sizeof(struct dylib_command);
429 char *name = (char *)calloc(nameLength + 1, 1);
430 if (fread(name, nameLength, 1, fp) != 1)
431 throw VuoException("The composition couldn't start because the dylib's ID command couldn't be read.");
432
433// VLog("Changing name \"%s\" to \"%s\"…", name, outputDylibPath.c_str());
434 fseek(fp, -nameLength, SEEK_CUR);
435 bzero(name, nameLength);
436 memcpy(name, outputDylibPath.c_str(), min(nameLength, outputDylibPath.length()));
437 fwrite(name, nameLength, 1, fp);
438
439 fclose(fp);
440 fpClosed = true;
441
442 // Since VuoRunner doesn't have access to VuoCompiler,
443 // we can't simply call VuoCompiler::getCodesignAllocatePath().
444 // Hopefully that method will already have been called once on the system,
445 // in order to generate the cache.
446 vector<string> environment;
447 string codesignAllocatePath = VuoFileUtilities::getCachePath() + "/codesign_allocate";
448 if (VuoFileUtilities::fileExists(codesignAllocatePath))
449 environment = { "CODESIGN_ALLOCATE=" + codesignAllocatePath };
450
451 try
452 {
453 VuoFileUtilities::adHocCodeSign(outputDylibPath, environment);
454 }
455 catch (std::exception &e)
456 {
457 VUserLog("Warning: Couldn't code-sign the renamed dylib: %s", e.what());
458 }
459
460 return;
461 }
462 else
463 fseek(fp, lc.cmdsize-sizeof(lc), SEEK_CUR);
464 }
465
466 throw VuoException("The composition couldn't start because the dylib's LC_ID_DYLIB command couldn't be found.");
467}
468
472int64_t VuoRunner_getDylibVMSize(const struct mach_header_64 *header)
473{
474 if (header->magic != MH_MAGIC_64)
475 return 0;
476
477 struct load_command *lc = (struct load_command *)((char *)header + sizeof(struct mach_header_64));
478 int64_t maxExtent = 0;
479 for (int i = 0; i < header->ncmds; ++i)
480 {
481 if (lc->cmd == LC_SEGMENT_64)
482 {
483 struct segment_command_64 *seg = (struct segment_command_64 *)lc;
484 maxExtent = MAX(maxExtent, seg->vmaddr + seg->vmsize);
485 }
486 else if (lc->cmd == LC_CODE_SIGNATURE
487 || lc->cmd == LC_SEGMENT_SPLIT_INFO
488 || lc->cmd == LC_FUNCTION_STARTS
489 || lc->cmd == LC_DATA_IN_CODE
490 || lc->cmd == LC_DYLIB_CODE_SIGN_DRS
491 || lc->cmd == LC_LINKER_OPTIMIZATION_HINT
492 || lc->cmd == LC_DYLD_EXPORTS_TRIE
493 || lc->cmd == LC_DYLD_CHAINED_FIXUPS)
494 {
495 struct linkedit_data_command *data = (struct linkedit_data_command *)lc;
496 maxExtent = MAX(maxExtent, data->dataoff + data->datasize);
497 }
498 else if (lc->cmd == LC_SYMTAB)
499 {
500 struct symtab_command *symtab = (struct symtab_command *)lc;
501 maxExtent = MAX(maxExtent, symtab->symoff + symtab->nsyms * sizeof(struct nlist_64));
502 maxExtent = MAX(maxExtent, symtab->stroff + symtab->strsize);
503 }
504 // AFAIK other load commands don't affect VM size.
505
506 lc = (struct load_command *)((char *)lc + lc->cmdsize);
507 }
508 return maxExtent;
509}
510
512
516static void VuoRunner_logDylibInfo(const struct mach_header_64 *mh, intptr_t vmaddr_slide, const char *func)
517{
519 return;
520
521 // Ignore system libraries.
522 if (mh->flags & MH_DYLIB_IN_CACHE)
523 return;
524
525 const char *homeZ = getenv("HOME");
526 string home;
527 if (homeZ)
528 home = homeZ;
529
530 Dl_info info{"", nullptr, "", nullptr};
531 dladdr((void *)vmaddr_slide, &info);
532 string filename{info.dli_fname};
533 if (VuoStringUtilities::beginsWith(filename, home))
534 filename = "~" + VuoStringUtilities::substrAfter(filename, home);
535
536 int64_t size = VuoRunner_getDylibVMSize(mh);
537
538 VuoLog(VuoLog_moduleName, __FILE__, __LINE__, func, "%16lx - %16lx (%lld bytes) %s", vmaddr_slide, (intptr_t)((char *)vmaddr_slide + size), size, filename.c_str());
539}
540
544void VuoRunner_dylibLoaded(const struct mach_header *mh, intptr_t vmaddr_slide)
545{
546 VuoRunner_logDylibInfo((struct mach_header_64 *)mh, vmaddr_slide, __func__);
547}
548
552void VuoRunner_dylibUnloaded(const struct mach_header *mh, intptr_t vmaddr_slide)
553{
554 VuoRunner_logDylibInfo((struct mach_header_64 *)mh, vmaddr_slide, __func__);
555}
556
566void VuoRunner::startInternal(void)
567{
568 stopped = false;
569 dispatch_semaphore_wait(stoppedSemaphore, DISPATCH_TIME_FOREVER);
570
571 ZMQContext = zmq_init(1);
572
573 if (isInCurrentProcess())
574 {
575 // Start the composition in the current process.
576
577 static once_flag dylibLoggerInitialized;
578 call_once(dylibLoggerInitialized, [](){
579 // Start logging info about the current process's dylibs, to assist in debugging.
581 _dyld_register_func_for_add_image(VuoRunner_dylibLoaded);
582 _dyld_register_func_for_remove_image(VuoRunner_dylibUnloaded);
584 });
585
586 bool alreadyLoaded = dlopen(dylibPath.c_str(), RTLD_NOLOAD);
587 if (alreadyLoaded)
588 {
589 // Each composition instance needs its own global variables.
590 // Change the dylib's internal name, to convince dlopen() to load another instance of it.
591
592 string uniquedDylibPath;
593 copyDylibAndChangeId(dylibPath, uniquedDylibPath);
594 VDebugLog("\"%s\" is already loaded, so I duplicated it and changed its LC_ID_DYLIB to \"%s\".", dylibPath.c_str(), uniquedDylibPath.c_str());
595
596 if (shouldDeleteBinariesWhenFinished)
597 remove(dylibPath.c_str());
598
599 dylibPath = uniquedDylibPath;
600 shouldDeleteBinariesWhenFinished = true;
601 }
602
603 dylibHandle = dlopen(dylibPath.c_str(), RTLD_NOW);
604 if (!dylibHandle)
605 throw VuoException("The composition couldn't start because the library '" + dylibPath + "' couldn't be loaded : " + dlerror());
606
607 try
608 {
610 if (! vuoInitInProcess)
611 throw VuoException("The composition couldn't start because vuoInitInProcess() couldn't be found in '" + dylibPath + "' : " + dlerror());
612
613 ZMQControlURL = "inproc://" + VuoFileUtilities::makeTmpFile("vuo-control", "");
614 ZMQTelemetryURL = "inproc://" + VuoFileUtilities::makeTmpFile("vuo-telemetry", "");
615
616 vuoInitInProcess(ZMQContext, ZMQControlURL.c_str(), ZMQTelemetryURL.c_str(), true, getpid(), -1, false,
617 sourceDir.c_str(), dylibHandle, NULL, false);
618 }
619 catch (VuoException &e)
620 {
621 VUserLog("error: %s", e.what());
622 dlclose(dylibHandle);
623 dylibHandle = NULL;
624 throw;
625 }
626 }
627 else
628 {
629 // Start the composition or composition loader in a new process.
630
631 vector<string> args;
632
633 string executableName;
634 if (isUsingCompositionLoader())
635 {
636 // If we're using the loader, set the executable's display name to the dylib,
637 // so that composition's name shows up in the process list.
638 string dir, file, ext;
639 VuoFileUtilities::splitPath(dylibPath, dir, file, ext);
640 executableName = file;
641 }
642 else
643 {
644 string dir, file, ext;
645 VuoFileUtilities::splitPath(executablePath, dir, file, ext);
646 string executableName = file;
647 if (! ext.empty())
648 executableName += "." + ext;
649 }
650 args.push_back(executableName);
651
652 // https://b33p.net/kosada/node/16374
653 // The socket's full pathname (`sockaddr_un::sun_path`) must be 104 characters or less
654 // (https://opensource.apple.com/source/xnu/xnu-2782.1.97/bsd/sys/un.h.auto.html).
655 // "/Users/me/Library/Containers/com.apple.ScreenSaver.Engine.legacyScreenSaver/Data/vuo-telemetry-rr8Br3"
656 // is 101 characters, which limits the username to 5 characters.
657 // "/Users/me/Library/Containers/com.apple.ScreenSaver.Engine.legacyScreenSaver/Data/v-rr8Br3"
658 // is 89 characters, which limits the username to 17 characters
659 // (still not a lot, but more likely to work with typical macOS usernames).
660 ZMQControlURL = "ipc://" + VuoFileUtilities::makeTmpFile("v", "");
661 ZMQTelemetryURL = "ipc://" + VuoFileUtilities::makeTmpFile("v", "");
662 args.push_back("--vuo-control=" + ZMQControlURL);
663 args.push_back("--vuo-telemetry=" + ZMQTelemetryURL);
664
665 {
666 ostringstream oss;
667 oss << getpid();
668 args.push_back("--vuo-runner-pid=" + oss.str());
669 }
670
671 {
672 ostringstream oss;
674 args.push_back("--vuo-runner-pipe=" + oss.str());
675 }
676
677 if (shouldContinueIfRunnerDies)
678 args.push_back("--vuo-continue-if-runner-dies");
679
680 if (isUsingCompositionLoader())
681 {
682 ZMQLoaderControlURL = "ipc://" + VuoFileUtilities::makeTmpFile("v", "");
683 args.push_back("--vuo-loader=" + ZMQLoaderControlURL);
684 }
685 else
686 args.push_back("--vuo-pause");
687
688 int fd[2];
689 int ret = pipe(fd);
690 if (ret)
691 throw VuoException("The composition couldn't start because a pipe couldn't be opened : " + string(strerror(errno)));
692
693 int argSize = args.size();
694 char *argv[argSize + 1];
695 for (size_t i = 0; i < argSize; ++i)
696 {
697 size_t mallocSize = args[i].length() + 1;
698 argv[i] = (char *)malloc(mallocSize);
699 strlcpy(argv[i], args[i].c_str(), mallocSize);
700 }
701 argv[argSize] = NULL;
702
703 string errorWorkingDirectory = "The composition couldn't start because the working directory couldn't be changed to '" + sourceDir + "' : ";
704 string errorExecutable = "The composition couldn't start because the file '" + executablePath + "' couldn't be executed : ";
705 string errorFork = "The composition couldn't start because the composition process couldn't be forked : ";
706 const size_t ERROR_BUFFER_LEN = 256;
707 char errorBuffer[ERROR_BUFFER_LEN];
708
709 pipe(runnerReadCompositionWritePipe);
710
711 pid_t childPid = fork();
712 if (childPid == 0)
713 {
714 // There are only a limited set of functions you're allowed to call in the child process
715 // after fork() and before exec(). Functions such as VUserLog() and exit() aren't allowed,
716 // so instead we're calling alternatives such as write() and _exit().
717
718 close(runnerReadCompositionWritePipe[0]);
719
720 pid_t grandchildPid = fork();
721 if (grandchildPid == 0)
722 {
723 close(fd[0]);
724 close(fd[1]);
725
726 // Set the current working directory to that of the source .vuo composition so that
727 // relative URL paths are resolved correctly.
728 if (!sourceDir.empty())
729 {
730 ret = chdir(sourceDir.c_str());
731 if (ret)
732 {
733 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
734 write(STDERR_FILENO, errorWorkingDirectory.c_str(), errorWorkingDirectory.length());
735 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
736 write(STDERR_FILENO, "\n", 1);
737 _exit(-1);
738 }
739 }
740
741 if (isRuntimeCheckingEnabled)
742 {
743 const char *vuoRuntimeChecking = getenv("VUO_RUNTIME_CHECKING");
744 if (vuoRuntimeChecking)
745 setenv("DYLD_INSERT_LIBRARIES", vuoRuntimeChecking, 1);
746 else
747 setenv("DYLD_INSERT_LIBRARIES", mainThreadChecker, 1);
748 }
749
750 ret = execv(executablePath.c_str(), argv);
751 if (ret)
752 {
753 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
754 write(STDERR_FILENO, errorExecutable.c_str(), errorExecutable.length());
755 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
756 write(STDERR_FILENO, "\n", 1);
757 for (size_t i = 0; i < argSize; ++i)
758 free(argv[i]);
759 _exit(-1);
760 }
761 }
762 else if (grandchildPid > 0)
763 {
764 close(fd[0]);
765
766 int ret = write(fd[1], &grandchildPid, sizeof(pid_t));
767 if (ret != sizeof(pid_t))
768 {
769 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
770 write(STDERR_FILENO, errorExecutable.c_str(), errorExecutable.length());
771 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
772 write(STDERR_FILENO, "\n", 1);
773 }
774 close(fd[1]);
775
776 _exit(0);
777 }
778 else
779 {
780 close(fd[0]);
781 close(fd[1]);
782
783 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
784 write(STDERR_FILENO, errorFork.c_str(), errorFork.length());
785 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
786 write(STDERR_FILENO, "\n", 1);
787 _exit(-1);
788 }
789 }
790 else if (childPid > 0)
791 {
792 close(fd[1]);
793
794 // If this process launches compositions in addition to this one,
795 // ensure they don't prop open this pipe,
796 // which would prevent VuoRunner::stop's `read()` from terminating.
797 VuoRunner_closeOnExec(runnerReadCompositionWritePipe[1]);
798
799 for (size_t i = 0; i < argSize; ++i)
800 free(argv[i]);
801
802 pid_t grandchildPid = 0;
803 int ret = read(fd[0], &grandchildPid, sizeof(pid_t));
804 if (ret != sizeof(pid_t))
805 throw VuoException("The composition couldn't start because the composition process id couldn't be obtained: " + string(strerror(errno)));
806 close(fd[0]);
807
808 // Reap the child process.
809 int status;
810 do {
811 ret = waitpid(childPid, &status, 0);
812 } while (ret == -1 && errno == EINTR);
813 if (WIFEXITED(status) && WEXITSTATUS(status))
814 throw VuoException("The composition couldn't start because the parent of the composition process exited with an error.");
815 else if (WIFSIGNALED(status))
816 throw VuoException("The composition couldn't start because the parent of the composition process exited abnormally : " + string(strsignal(WTERMSIG(status))));
817
818 if (grandchildPid > 0)
819 compositionPid = grandchildPid;
820 else
821 throw VuoException("The composition couldn't start because the composition process id couldn't be obtained");
822 }
823 else
824 {
825 for (size_t i = 0; i < argSize; ++i)
826 free(argv[i]);
827
828 throw VuoException("The composition couldn't start because the parent of the composition process couldn't be forked : " + string(strerror(errno)));
829 }
830 }
831
832 // Connect to the composition loader (if any) and composition.
833 if (isUsingCompositionLoader())
834 {
835 ZMQLoaderControl = zmq_socket(ZMQContext,ZMQ_REQ);
837
838 // Try to connect to the composition loader. If at first we don't succeed, give the composition loader a little more time to set up the socket.
839 int numTries = 0;
840 while (zmq_connect(ZMQLoaderControl,ZMQLoaderControlURL.c_str()))
841 {
842 if (++numTries == 1000)
843 throw VuoException("The composition couldn't start because the runner couldn't establish communication with the composition loader : " + string(strerror(errno)));
844 usleep(USEC_PER_SEC / 1000);
845 }
846
847 // Actually start the composition.
848 // Since we don't know how long this will take, the socket has an infinite timeout (https://b33p.net/kosada/vuo/vuo/-/issues/18450).
849 replaceComposition(dylibPath, "");
850 }
851 else
852 {
853 __block string errorMessage;
854 dispatch_sync(controlQueue, ^{
855 try {
856 setUpConnections();
857 } catch (VuoException &e) {
858 errorMessage = e.what();
859 }
860 });
861 if (! errorMessage.empty())
862 throw VuoException(errorMessage);
863 }
864}
865
870void *VuoRunner_listen(void *context)
871{
872 pthread_detach(pthread_self());
873 VuoRunner *runner = static_cast<VuoRunner *>(context);
874 runner->listen();
875 return NULL;
876}
877
883void VuoRunner::setUpConnections(void)
884{
885 ZMQControl = zmq_socket(ZMQContext,ZMQ_REQ);
887
888 // Try to connect to the composition. If at first we don't succeed, give the composition a little more time to set up the socket.
889 int numTries = 0;
890 while (zmq_connect(ZMQControl,ZMQControlURL.c_str()))
891 {
892 if (++numTries == 1000)
893 throw VuoException("The composition couldn't start because the runner couldn't establish communication to control the composition : " + string(strerror(errno)));
894 usleep(USEC_PER_SEC / 1000);
895 }
896
897 // Cache published ports so they're available whenever a caller starts listening for published port value changes.
898 arePublishedInputPortsCached = false;
899 arePublishedOutputPortsCached = false;
900 if (isInCurrentProcess())
901 {
902 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
903 __block string publishedPortsError;
904 dispatch_async(queue, ^{
905 try {
906 getCachedPublishedPorts(false);
907 getCachedPublishedPorts(true);
908 } catch (VuoException &e) {
909 publishedPortsError = e.what();
910 }
911 });
912 while (! (arePublishedInputPortsCached && arePublishedOutputPortsCached) )
913 {
915 usleep(USEC_PER_SEC / 1000);
916
917 if (! publishedPortsError.empty())
918 throw VuoException(publishedPortsError);
919 }
920 }
921 else
922 {
923 getCachedPublishedPorts(false);
924 getCachedPublishedPorts(true);
925 }
926
927 listenCanceled = false;
928 dispatch_semaphore_wait(endedListeningSemaphore, DISPATCH_TIME_FOREVER);
929
930 pthread_t listenThread;
931 int ret = pthread_create(&listenThread, nullptr, &VuoRunner_listen, this);
932 if (ret)
933 throw VuoException(string("The composition couldn't start because the runner couldn't create a thread: ") + strerror(ret));
934
935 dispatch_semaphore_wait(beganListeningSemaphore, DISPATCH_TIME_FOREVER);
936 if (!listenError.empty())
937 throw VuoException("The composition couldn't start because the runner couldn't establish communication to listen to the composition: " + listenError);
938
941}
942
958{
960
961 if (! isInCurrentProcess())
962 throw VuoException("The composition is not running in the current process. Only use this function if the composition was constructed with newCurrentProcessRunnerFromDynamicLibrary().");
963
964 if (! isMainThread())
965 throw VuoException("This is not the main thread. Only call this function from the main thread.");
966
967 while (! stopped)
968 VuoEventLoop_processEvent(VuoEventLoop_WaitIndefinitely);
969}
970
996{
998
999 if (! isInCurrentProcess())
1000 throw VuoException("The composition is not running in the current process. Only use this function if the composition was constructed with newCurrentProcessRunnerFromDynamicLibrary().");
1001
1002 if (! isMainThread())
1003 throw VuoException("This is not the main thread. Only call this function from the main thread.");
1004
1005 VuoEventLoop_processEvent(VuoEventLoop_RunOnce);
1006}
1007
1016{
1018
1019 dispatch_sync(controlQueue, ^{
1020 if (stopped || lostContact) {
1021 return;
1022 }
1023
1025
1026 try
1027 {
1030 }
1031 catch (VuoException &e)
1032 {
1033 stopBecauseLostContact(e.what());
1034 }
1035
1036 paused = true;
1037 });
1038}
1039
1046{
1048
1049 dispatch_sync(controlQueue, ^{
1050 if (stopped || lostContact) {
1051 return;
1052 }
1053
1055
1056 try
1057 {
1060 }
1061 catch (VuoException &e)
1062 {
1063 stopBecauseLostContact(e.what());
1064 }
1065
1066 paused = false;
1067 });
1068}
1069
1089void VuoRunner::replaceComposition(string compositionDylibPath, string compositionDiff)
1090{
1092
1093 if (! isUsingCompositionLoader())
1094 throw VuoException("The runner is not using a composition loader. Only use this function if the composition was constructed with newSeparateProcessRunnerFromDynamicLibrary().");
1095
1096 dispatch_sync(controlQueue, ^{
1097 if (stopped || lostContact) {
1098 return;
1099 }
1100
1101 VUserLog("Loading composition…");
1102
1103 if (dylibPath != compositionDylibPath)
1104 {
1105 if (shouldDeleteBinariesWhenFinished)
1106 {
1107 remove(dylibPath.c_str());
1108 }
1109
1110 dylibPath = compositionDylibPath;
1111 }
1112
1114
1115 try
1116 {
1117 if (! paused)
1118 {
1119 VUserLog(" Pausing…");
1122 }
1123
1124 cleanUpConnections();
1125
1126 vector<string> dependencyDylibPathsRemoved = dependencyLibraries->dequeueLibrariesToUnload();
1127 vector<string> dependencyDylibPathsAdded = dependencyLibraries->dequeueLibrariesToLoad();
1128
1129 unsigned int messageCount = 4 + dependencyDylibPathsAdded.size() + dependencyDylibPathsRemoved.size();
1130 zmq_msg_t *messages = (zmq_msg_t *)malloc(messageCount * sizeof(zmq_msg_t));
1131 int index = 0;
1132
1133 vuoInitMessageWithString(&messages[index++], dylibPath.c_str());
1134
1135 vuoInitMessageWithInt(&messages[index++], dependencyDylibPathsAdded.size());
1136 for (vector<string>::iterator i = dependencyDylibPathsAdded.begin(); i != dependencyDylibPathsAdded.end(); ++i) {
1137 vuoInitMessageWithString(&messages[index++], (*i).c_str());
1138 }
1139
1140 vuoInitMessageWithInt(&messages[index++], dependencyDylibPathsRemoved.size());
1141 for (vector<string>::iterator i = dependencyDylibPathsRemoved.begin(); i != dependencyDylibPathsRemoved.end(); ++i) {
1142 vuoInitMessageWithString(&messages[index++], (*i).c_str());
1143 }
1144
1145 vuoInitMessageWithString(&messages[index], compositionDiff.c_str());
1146
1147 if (! paused)
1148 VUserLog(" Replacing composition…");
1149
1150 vuoLoaderControlRequestSend(VuoLoaderControlRequestCompositionReplace,messages,messageCount);
1151 vuoLoaderControlReplyReceive(VuoLoaderControlReplyCompositionReplaced);
1152
1153 setUpConnections();
1154
1155 if (! paused)
1156 {
1157 VUserLog(" Unpausing…");
1160 }
1161
1162 VUserLog(" Done.");
1163 }
1164 catch (VuoException &e)
1165 {
1166 stopBecauseLostContact(e.what());
1167 }
1168 });
1169}
1170
1185{
1187
1188 dispatch_sync(controlQueue, ^{
1189 if (stopped) {
1190 return;
1191 }
1192
1194
1195 // Only tell the composition to stop if it hasn't already ended on its own.
1196 if (! lostContact)
1197 {
1198 try
1199 {
1200 int timeoutInSeconds = (isInCurrentProcess() ? -1 : 5);
1201 zmq_msg_t messages[2];
1202 vuoInitMessageWithInt(&messages[0], timeoutInSeconds);
1203 vuoInitMessageWithBool(&messages[1], false); // isBeingReplaced
1205
1206 if (isInCurrentProcess() && isMainThread())
1207 {
1208 // If VuoRunner::stop() is blocking the main thread, wait for the composition's reply on another thread, and drain the main queue, in case the composition needs to shut down stuff that requires the main queue.
1209 __block bool replyReceived = false;
1210 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
1212 try
1213 {
1215 }
1216 catch (...)
1217 {
1218 // do nothing; doesn't matter if connection timed out
1219 }
1220 replyReceived = true;
1221 });
1222 while (!replyReceived)
1223 {
1225 usleep(USEC_PER_SEC / 1000);
1226 }
1228 }
1229 else
1231 }
1232 catch (...)
1233 {
1234 // do nothing; doesn't matter if connection timed out
1235 }
1236 }
1237
1238 cleanUpConnections();
1239
1240 if (isUsingCompositionLoader() && ZMQLoaderControl)
1241 {
1242 zmq_close(ZMQLoaderControl);
1243 ZMQLoaderControl = NULL;
1244 }
1245
1246 if (isInCurrentProcess() && dylibHandle)
1247 {
1249 if (vuoInitInProcess) // Avoid double jeopardy if startInternal() already failed for missing vuoInitInProcess.
1250 {
1251 VuoFiniType *vuoFini = (VuoFiniType *)dlsym(dylibHandle, "vuoFini");
1252 if (! vuoFini)
1253 {
1254 VUserLog("The composition couldn't stop because vuoFini() couldn't be found in '%s' : %s", dylibPath.c_str(), dlerror());
1255 return;
1256 }
1257 void *runtimeState = vuoFini();
1258
1260 if (! vuoFiniRuntimeState)
1261 {
1262 VUserLog("The composition couldn't stop because vuoFiniRuntimeState() couldn't be found in '%s' : %s", dylibPath.c_str(), dlerror());
1263 return;
1264 }
1266 }
1267
1268 dlclose(dylibHandle);
1269 dylibHandle = NULL;
1270 }
1271 else if (isInCurrentProcess() && !dylibHandle)
1272 {
1273 // If the dylib isn't open, the composition isn't running, so there's nothing to clean up.
1274 }
1275 else
1276 {
1277 char buf[1];
1278 close(runnerReadCompositionWritePipe[1]);
1279
1280 if (! lostContact)
1281 {
1282 // Wait for child process to end.
1283 // Can't use waitpid() since it only waits on child processes, yet compositionPid is a grandchild.
1284 // Instead, do a blocking read() — the grandchild never writes anything to the pipe, and when the grandchild exits,
1285 // read() will return EOF (since it was the last process that had it open for writing).
1286 read(runnerReadCompositionWritePipe[0], &buf, 1);
1287 }
1288
1289 close(runnerReadCompositionWritePipe[0]);
1290
1291 if (! lostContact)
1292 {
1293 zmq_term(ZMQContext);
1294 ZMQContext = NULL;
1295 }
1296 else
1297 {
1298 dispatch_semaphore_wait(terminatedZMQContextSemaphore, DISPATCH_TIME_FOREVER);
1299 }
1300 }
1301
1302 if (shouldDeleteBinariesWhenFinished)
1303 {
1304 if (isUsingCompositionLoader())
1305 {
1306 remove(dylibPath.c_str());
1307 }
1308 else if (isInCurrentProcess())
1309 {
1310 remove(dylibPath.c_str());
1311 }
1312 else
1313 {
1314 remove(executablePath.c_str());
1315 }
1316 }
1317
1318 dependencyLibraries = nullptr; // release shared_ptr
1319
1320 stopped = true;
1321 dispatch_semaphore_signal(stoppedSemaphore);
1323 });
1324}
1325
1329void VuoRunner::cleanUpConnections(void)
1330{
1331 if (! ZMQControl)
1332 return;
1333
1334 zmq_close(ZMQControl);
1335 ZMQControl = NULL;
1336
1337 if (ZMQSelfSend)
1338 // Break out of zmq_poll().
1339 vuoSend("VuoRunner::ZMQSelfSend", ZMQSelfSend, 0, nullptr, 0, false, nullptr);
1340
1341 dispatch_semaphore_wait(endedListeningSemaphore, DISPATCH_TIME_FOREVER);
1342 dispatch_semaphore_signal(endedListeningSemaphore);
1343}
1344
1351{
1353
1354 dispatch_retain(stoppedSemaphore);
1355 dispatch_semaphore_wait(stoppedSemaphore, DISPATCH_TIME_FOREVER);
1356 dispatch_semaphore_signal(stoppedSemaphore);
1357 dispatch_release(stoppedSemaphore);
1358}
1359
1375void VuoRunner::setInputPortValue(string compositionIdentifier, string portIdentifier, json_object *value)
1376{
1378
1379 const char *valueAsString = json_object_to_json_string_ext(value, JSON_C_TO_STRING_PLAIN);
1380
1381 dispatch_sync(controlQueue, ^{
1382 if (stopped || lostContact) {
1383 return;
1384 }
1385
1387
1388 try
1389 {
1390 zmq_msg_t messages[3];
1391 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1392 vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1393 vuoInitMessageWithString(&messages[2], valueAsString);
1396 }
1397 catch (VuoException &e)
1398 {
1399 stopBecauseLostContact(e.what());
1400 }
1401 });
1402}
1403
1417void VuoRunner::fireTriggerPortEvent(string compositionIdentifier, string portIdentifier)
1418{
1420
1421 dispatch_sync(controlQueue, ^{
1422 if (stopped || lostContact) {
1423 return;
1424 }
1425
1427
1428 try
1429 {
1430 zmq_msg_t messages[2];
1431 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1432 vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1435 }
1436 catch (VuoException &e)
1437 {
1438 stopBecauseLostContact(e.what());
1439 }
1440 });
1441}
1442
1459json_object * VuoRunner::getInputPortValue(string compositionIdentifier, string portIdentifier)
1460{
1462
1463 __block string valueAsString;
1464 dispatch_sync(controlQueue, ^{
1465 if (stopped || lostContact) {
1466 return;
1467 }
1468
1470
1471 try
1472 {
1473 zmq_msg_t messages[3];
1474 vuoInitMessageWithBool(&messages[0], !isInCurrentProcess());
1475 vuoInitMessageWithString(&messages[1], compositionIdentifier.c_str());
1476 vuoInitMessageWithString(&messages[2], portIdentifier.c_str());
1479 valueAsString = receiveString("null");
1480 }
1481 catch (VuoException &e)
1482 {
1483 stopBecauseLostContact(e.what());
1484 }
1485 });
1486 return json_tokener_parse(valueAsString.c_str());
1487}
1488
1505json_object * VuoRunner::getOutputPortValue(string compositionIdentifier, string portIdentifier)
1506{
1508
1509 __block string valueAsString;
1510 dispatch_sync(controlQueue, ^{
1511 if (stopped || lostContact) {
1512 return;
1513 }
1514
1516
1517 try
1518 {
1519 zmq_msg_t messages[3];
1520 vuoInitMessageWithBool(&messages[0], !isInCurrentProcess());
1521 vuoInitMessageWithString(&messages[1], compositionIdentifier.c_str());
1522 vuoInitMessageWithString(&messages[2], portIdentifier.c_str());
1525 valueAsString = receiveString("null");
1526 }
1527 catch (VuoException &e)
1528 {
1529 stopBecauseLostContact(e.what());
1530 }
1531 });
1532 return json_tokener_parse(valueAsString.c_str());
1533}
1534
1548string VuoRunner::getInputPortSummary(string compositionIdentifier, string portIdentifier)
1549{
1551
1552 __block string summary;
1553 dispatch_sync(controlQueue, ^{
1554 if (stopped || lostContact) {
1555 return;
1556 }
1557
1559
1560 try
1561 {
1562 zmq_msg_t messages[2];
1563 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1564 vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1567 summary = receiveString("");
1568 }
1569 catch (VuoException &e)
1570 {
1571 stopBecauseLostContact(e.what());
1572 }
1573 });
1574 return summary;
1575}
1576
1590string VuoRunner::getOutputPortSummary(string compositionIdentifier, string portIdentifier)
1591{
1593
1594 __block string summary;
1595 dispatch_sync(controlQueue, ^{
1596 if (stopped || lostContact) {
1597 return;
1598 }
1599
1601
1602 try
1603 {
1604 zmq_msg_t messages[2];
1605 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1606 vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1609 summary = receiveString("");
1610 }
1611 catch (VuoException &e)
1612 {
1613 stopBecauseLostContact(e.what());
1614 }
1615 });
1616 return summary;
1617}
1618
1632string VuoRunner::subscribeToInputPortTelemetry(string compositionIdentifier, string portIdentifier)
1633{
1635
1636 __block string summary;
1637 dispatch_sync(controlQueue, ^{
1638 if (stopped || lostContact) {
1639 return;
1640 }
1641
1643
1644 try
1645 {
1646 zmq_msg_t messages[2];
1647 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1648 vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1651 summary = receiveString("");
1652 }
1653 catch (VuoException &e)
1654 {
1655 stopBecauseLostContact(e.what());
1656 }
1657 });
1658 return summary;
1659}
1660
1674string VuoRunner::subscribeToOutputPortTelemetry(string compositionIdentifier, string portIdentifier)
1675{
1677
1678 __block string summary;
1679 dispatch_sync(controlQueue, ^{
1680 if (stopped || lostContact) {
1681 return;
1682 }
1683
1685
1686 try
1687 {
1688 zmq_msg_t messages[2];
1689 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1690 vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1693 summary = receiveString("");
1694 }
1695 catch (VuoException &e)
1696 {
1697 stopBecauseLostContact(e.what());
1698 }
1699 });
1700 return summary;
1701}
1702
1713void VuoRunner::unsubscribeFromInputPortTelemetry(string compositionIdentifier, string portIdentifier)
1714{
1716
1717 dispatch_sync(controlQueue, ^{
1718 if (stopped || lostContact) {
1719 return;
1720 }
1721
1723
1724 try
1725 {
1726 zmq_msg_t messages[2];
1727 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1728 vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1731 }
1732 catch (VuoException &e)
1733 {
1734 stopBecauseLostContact(e.what());
1735 }
1736 });
1737}
1738
1749void VuoRunner::unsubscribeFromOutputPortTelemetry(string compositionIdentifier, string portIdentifier)
1750{
1752
1753 dispatch_sync(controlQueue, ^{
1754 if (stopped || lostContact) {
1755 return;
1756 }
1757
1759
1760 try
1761 {
1762 zmq_msg_t messages[2];
1763 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1764 vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1767 }
1768 catch (VuoException &e)
1769 {
1770 stopBecauseLostContact(e.what());
1771 }
1772 });
1773}
1774
1785void VuoRunner::subscribeToEventTelemetry(string compositionIdentifier)
1786{
1788
1789 dispatch_sync(controlQueue, ^{
1790 if (stopped || lostContact) {
1791 return;
1792 }
1793
1795
1796 try
1797 {
1798 zmq_msg_t messages[1];
1799 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1802 }
1803 catch (VuoException &e)
1804 {
1805 stopBecauseLostContact(e.what());
1806 }
1807 });
1808}
1809
1822void VuoRunner::unsubscribeFromEventTelemetry(string compositionIdentifier)
1823{
1825
1826 dispatch_sync(controlQueue, ^{
1827 if (stopped || lostContact) {
1828 return;
1829 }
1830
1832
1833 try
1834 {
1835 zmq_msg_t messages[1];
1836 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1839 }
1840 catch (VuoException &e)
1841 {
1842 stopBecauseLostContact(e.what());
1843 }
1844 });
1845}
1846
1857void VuoRunner::subscribeToAllTelemetry(string compositionIdentifier)
1858{
1860
1861 dispatch_sync(controlQueue, ^{
1862 if (stopped || lostContact) {
1863 return;
1864 }
1865
1867
1868 try
1869 {
1870 zmq_msg_t messages[1];
1871 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1874 }
1875 catch (VuoException &e)
1876 {
1877 stopBecauseLostContact(e.what());
1878 }
1879 });
1880}
1881
1894void VuoRunner::unsubscribeFromAllTelemetry(string compositionIdentifier)
1895{
1897
1898 dispatch_sync(controlQueue, ^{
1899 if (stopped || lostContact) {
1900 return;
1901 }
1902
1904
1905 try
1906 {
1907 zmq_msg_t messages[1];
1908 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1911 }
1912 catch (VuoException &e)
1913 {
1914 stopBecauseLostContact(e.what());
1915 }
1916 });
1917}
1918
1932void VuoRunner::setPublishedInputPortValues(map<Port *, json_object *> portsAndValuesToSet)
1933{
1935
1937 for (auto i : portsAndValuesToSet)
1938 {
1939 string portName = i.first->getName();
1940 if (portName == "width")
1941 p->lastWidth = json_object_get_int64(i.second);
1942 else if (portName == "height")
1943 p->lastHeight = json_object_get_int64(i.second);
1944 else if (portName == "image" || portName == "startImage")
1945 {
1946 json_object *o;
1947 if (json_object_object_get_ex(i.second, "pixelsWide", &o))
1948 p->lastWidth = json_object_get_int64(o);
1949 if (json_object_object_get_ex(i.second, "pixelsHigh", &o))
1950 p->lastHeight = json_object_get_int64(o);
1951 if (json_object_object_get_ex(i.second, "pointer", &o))
1952 {
1953 VuoImage vi = (VuoImage)json_object_get_int64(o);
1954 p->lastWidth = vi->pixelsWide;
1955 p->lastHeight = vi->pixelsHigh;
1956 }
1957 }
1958 }
1959
1960 dispatch_sync(controlQueue, ^{
1961 if (stopped || lostContact) {
1962 return;
1963 }
1964
1966
1967 try
1968 {
1969 int messageCount = portsAndValuesToSet.size() * 2;
1970 zmq_msg_t messages[messageCount];
1971
1972 int i = 0;
1973 for (auto &kv : portsAndValuesToSet)
1974 {
1975 vuoInitMessageWithString(&messages[i++], kv.first->getName().c_str());
1976 vuoInitMessageWithString(&messages[i++], json_object_to_json_string_ext(kv.second, JSON_C_TO_STRING_PLAIN));
1977 }
1978
1981 }
1982 catch (VuoException &e)
1983 {
1984 stopBecauseLostContact(e.what());
1985 }
1986 });
1987}
1988
1997{
1999
2000 set<VuoRunner::Port *> portAsSet;
2001 portAsSet.insert(port);
2002 firePublishedInputPortEvent(portAsSet);
2003}
2004
2015void VuoRunner::firePublishedInputPortEvent(const set<Port *> &ports)
2016{
2018
2019 dispatch_sync(controlQueue, ^{
2020 if (stopped || lostContact) {
2021 return;
2022 }
2023
2025
2026 lastFiredEventSignaled = false;
2027
2028 try
2029 {
2030 size_t messageCount = ports.size() + 1;
2031 zmq_msg_t messages[messageCount];
2032
2033 vuoInitMessageWithInt(&messages[0], ports.size());
2034 int i = 1;
2035 for (VuoRunner::Port *port : ports) {
2036 vuoInitMessageWithString(&messages[i++], port->getName().c_str());
2037 }
2038
2041 }
2042 catch (VuoException &e)
2043 {
2044 stopBecauseLostContact(e.what());
2045 }
2046 });
2047}
2048
2074{
2076
2077 saturating_semaphore_wait(lastFiredEventSemaphore, &lastFiredEventSignaled);
2078}
2079
2094{
2096
2097 __block string valueAsString;
2098 dispatch_sync(controlQueue, ^{
2099 if (stopped || lostContact) {
2100 return;
2101 }
2102
2104
2105 try
2106 {
2107 zmq_msg_t messages[2];
2108 vuoInitMessageWithBool(&messages[0], !isInCurrentProcess());
2109 vuoInitMessageWithString(&messages[1], port->getName().c_str());
2112 valueAsString = receiveString("null");
2113 }
2114 catch (VuoException &e)
2115 {
2116 stopBecauseLostContact(e.what());
2117 }
2118 });
2119 return json_tokener_parse(valueAsString.c_str());
2120}
2121
2136{
2138
2139 __block string valueAsString;
2140 dispatch_sync(controlQueue, ^{
2141 if (stopped || lostContact) {
2142 return;
2143 }
2144
2146
2147 try
2148 {
2149 zmq_msg_t messages[2];
2150 vuoInitMessageWithBool(&messages[0], !isInCurrentProcess());
2151 vuoInitMessageWithString(&messages[1], port->getName().c_str());
2154 valueAsString = receiveString("null");
2155 }
2156 catch (VuoException &e)
2157 {
2158 stopBecauseLostContact(e.what());
2159 }
2160 });
2161
2162 // https://b33p.net/kosada/node/17535
2163 json_object *js = json_tokener_parse(valueAsString.c_str());
2164 if (VuoRunner_isHostVDMX && port->getName() == "outputImage")
2165 {
2166 json_object *o;
2167 uint64_t actualWidth = 0;
2168 if (json_object_object_get_ex(js, "pixelsWide", &o))
2169 actualWidth = json_object_get_int64(o);
2170 uint64_t actualHeight = 0;
2171 if (json_object_object_get_ex(js, "pixelsHigh", &o))
2172 actualHeight = json_object_get_int64(o);
2173 if (json_object_object_get_ex(js, "pointer", &o))
2174 {
2175 VuoImage vi = (VuoImage)json_object_get_int64(o);
2176 actualWidth = vi->pixelsWide;
2177 actualHeight = vi->pixelsHigh;
2178 }
2179
2180 if (p->lastWidth && p->lastHeight
2181 && (actualWidth != p->lastWidth || actualHeight != p->lastHeight))
2182 {
2183 call_once(p->vuoImageFunctionsInitialized, [=](){
2184 p->vuoImageMakeFromJsonWithDimensions = (Private::vuoImageMakeFromJsonWithDimensionsType)dlsym(RTLD_SELF, "VuoImage_makeFromJsonWithDimensions");
2185 if (!p->vuoImageMakeFromJsonWithDimensions)
2186 {
2187 VUserLog("Error: Couldn't find VuoImage_makeFromJsonWithDimensions.");
2188 return;
2189 }
2190
2191 p->vuoImageGetInterprocessJson = (Private::vuoImageGetInterprocessJsonType)dlsym(RTLD_SELF, "VuoImage_getInterprocessJson");
2193 {
2194 VUserLog("Error: Couldn't find VuoImage_getInterprocessJson.");
2195 return;
2196 }
2197 });
2198
2200 {
2202 json_object *jsResized = p->vuoImageGetInterprocessJson(vi);
2203 json_object_put(js);
2204 return jsResized;
2205 }
2206 }
2207 }
2208
2209 return js;
2210}
2211
2226vector<VuoRunner::Port *> VuoRunner::getCachedPublishedPorts(bool input)
2227{
2228 // Caching not only provides faster access (without zmq messages),
2229 // but also ensures that the VuoRunner::Port pointers passed to
2230 // VuoRunnerDelegate::receivedTelemetryPublishedOutputPortUpdated are consistent.
2231
2232 if (input)
2233 {
2234 if (! arePublishedInputPortsCached)
2235 {
2236 publishedInputPorts = refreshPublishedPorts(true);
2237 arePublishedInputPortsCached = true;
2238 }
2239 return publishedInputPorts;
2240 }
2241 else
2242 {
2243 if (! arePublishedOutputPortsCached)
2244 {
2245 publishedOutputPorts = refreshPublishedPorts(false);
2246 arePublishedOutputPortsCached = true;
2247 }
2248 return publishedOutputPorts;
2249 }
2250}
2251
2263vector<VuoRunner::Port *> VuoRunner::refreshPublishedPorts(bool input)
2264{
2265 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
2266 dispatch_source_t timeout = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, queue);
2267 dispatch_source_set_timer(timeout, dispatch_time(DISPATCH_TIME_NOW, 5*NSEC_PER_SEC), NSEC_PER_SEC, NSEC_PER_SEC/10);
2268 dispatch_source_set_event_handler(timeout, ^{
2269 stopBecauseLostContact("The connection between the composition and runner timed out when trying to receive the list of published ports");
2270 dispatch_source_cancel(timeout);
2271 });
2272 dispatch_resume(timeout);
2273
2274 vector<VuoRunner::Port *> ports;
2275
2276 try
2277 {
2279
2280 enum VuoControlRequest requests[4];
2281 enum VuoControlReply replies[4];
2282 if (input)
2283 {
2290 }
2291 else
2292 {
2299 }
2300
2301 vector<string> names;
2302 vector<string> types;
2303 vector<string> details;
2304
2305 for (int i = 0; i < 3; ++i)
2306 {
2307 vuoControlRequestSend(requests[i], NULL, 0);
2308 vuoControlReplyReceive(replies[i]);
2309 vector<string> messageStrings = receiveListOfStrings();
2310 if (i == 0)
2311 names = messageStrings;
2312 else if (i == 1)
2313 types = messageStrings;
2314 else
2315 details = messageStrings;
2316 }
2317
2318 for (size_t i = 0; i < names.size() && i < types.size() && i < details.size(); ++i)
2319 {
2320 VuoRunner::Port *port = new Port(names[i], types[i], json_tokener_parse(details[i].c_str()));
2321 ports.push_back(port);
2322 }
2323 }
2324 catch (...)
2325 {
2326 dispatch_source_cancel(timeout);
2327 dispatch_release(timeout);
2328 throw;
2329 }
2330
2331 dispatch_source_cancel(timeout);
2332 dispatch_release(timeout);
2333
2334 return ports;
2335}
2336
2346vector<VuoRunner::Port *> VuoRunner::getPublishedInputPorts(void)
2347{
2349
2350 return getCachedPublishedPorts(true);
2351}
2352
2362vector<VuoRunner::Port *> VuoRunner::getPublishedOutputPorts(void)
2363{
2365
2366 return getCachedPublishedPorts(false);
2367}
2368
2379{
2381
2382 vector<VuoRunner::Port *> inputPorts = getPublishedInputPorts();
2383 for (vector<VuoRunner::Port *>::iterator i = inputPorts.begin(); i != inputPorts.end(); ++i)
2384 if ((*i)->getName() == name)
2385 return *i;
2386
2387 return NULL;
2388}
2389
2400{
2402
2403 vector<VuoRunner::Port *> outputPorts = getPublishedOutputPorts();
2404 for (vector<VuoRunner::Port *>::iterator i = outputPorts.begin(); i != outputPorts.end(); ++i)
2405 if ((*i)->getName() == name)
2406 return *i;
2407
2408 return NULL;
2409}
2410
2422void VuoRunner::listen()
2423{
2424 // Name this thread.
2425 {
2426 const char *compositionName = dylibPath.empty() ? executablePath.c_str() : dylibPath.c_str();
2427
2428 // Trim the path, if present.
2429 if (const char *lastSlash = strrchr(compositionName, '/'))
2430 compositionName = lastSlash + 1;
2431
2432 char threadName[MAXTHREADNAMESIZE];
2433 snprintf(threadName, MAXTHREADNAMESIZE, "org.vuo.runner.telemetry: %s", compositionName);
2434 pthread_setname_np(threadName);
2435 }
2436
2437 ZMQSelfReceive = zmq_socket(ZMQContext, ZMQ_PAIR);
2438 VuoRunner_configureSocket(ZMQSelfReceive);
2439 if (zmq_bind(ZMQSelfReceive, "inproc://vuo-runner-self") != 0)
2440 {
2441 listenError = strerror(errno);
2442 dispatch_semaphore_signal(beganListeningSemaphore);
2443 return;
2444 }
2445
2446 ZMQSelfSend = zmq_socket(ZMQContext, ZMQ_PAIR);
2447 VuoRunner_configureSocket(ZMQSelfSend);
2448 if (zmq_connect(ZMQSelfSend, "inproc://vuo-runner-self") != 0)
2449 {
2450 listenError = strerror(errno);
2451 dispatch_semaphore_signal(beganListeningSemaphore);
2452 return;
2453 }
2454
2455 {
2456 ZMQTelemetry = zmq_socket(ZMQContext,ZMQ_SUB);
2457 VuoRunner_configureSocket(ZMQTelemetry);
2458 if(zmq_connect(ZMQTelemetry,ZMQTelemetryURL.c_str()))
2459 {
2460 listenError = strerror(errno);
2461 dispatch_semaphore_signal(beganListeningSemaphore);
2462 return;
2463 }
2464
2465 const int highWaterMark = 0; // no limit
2466 if(zmq_setsockopt(ZMQTelemetry,ZMQ_RCVHWM,&highWaterMark,sizeof(highWaterMark)))
2467 {
2468 listenError = strerror(errno);
2469 dispatch_semaphore_signal(beganListeningSemaphore);
2470 return;
2471 }
2472 }
2473
2474 {
2475 // subscribe to all types of telemetry
2476 char type = VuoTelemetryHeartbeat;
2477 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2479 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2481 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2483 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2485 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2487 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2489 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2491 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2492 type = VuoTelemetryError;
2493 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2495 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2496 }
2497
2498 {
2499 // Wait until the connection is established, as evidenced by a heartbeat telemetry message
2500 // being received from the composition. This is necessary because the ØMQ API doesn't provide
2501 // any way to tell when a SUB socket is ready to receive messages, and if you call zmq_poll()
2502 // on it before it's ready, then it might miss messages that came in while it was still trying
2503 // to get ready. (The zmq_connect() function doesn't make any guarantees about the socket being ready.
2504 // It just starts some setup that may continue asynchronously after zmq_connect() has returned.)
2505 // To avoid missing important telemetry messages from the composition, we make sure that the
2506 // runner doesn't tell the composition to unpause until the runner has verified that it's
2507 // receiving heartbeat telemetry messages. http://zguide.zeromq.org/page:all#Node-Coordination
2508 zmq_pollitem_t items[]=
2509 {
2510 {ZMQTelemetry,0,ZMQ_POLLIN,0},
2511 };
2512 int itemCount = 1;
2513 long timeout = -1;
2514 zmq_poll(items,itemCount,timeout);
2515 }
2516
2517 dispatch_semaphore_signal(beganListeningSemaphore);
2518
2519 bool pendingCancel = false;
2520 while(! listenCanceled)
2521 {
2522 zmq_pollitem_t items[]=
2523 {
2524 {ZMQTelemetry,0,ZMQ_POLLIN,0},
2525 {ZMQSelfReceive,0,ZMQ_POLLIN,0},
2526 };
2527 int itemCount = 2;
2528
2529 // Wait 1 second. If no telemetry was received in that second, we probably lost contact with the composition.
2530 long timeout = pendingCancel ? 100 : 1000;
2531 zmq_poll(items,itemCount,timeout);
2532 if(items[0].revents & ZMQ_POLLIN)
2533 {
2534 // Receive telemetry type.
2535 char type = vuoReceiveInt(ZMQTelemetry, NULL);
2536
2537 // Receive telemetry arguments and forward to VuoRunnerDelegate.
2538 switch (type)
2539 {
2541 {
2542 dispatch_sync(delegateQueue, ^{
2543 if (delegate)
2544 delegate->receivedTelemetryStats(0, 0);
2545 });
2546 break;
2547 }
2549 {
2550 char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2551 char *nodeIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2552 dispatch_sync(delegateQueue, ^{
2553 if (delegate)
2554 delegate->receivedTelemetryNodeExecutionStarted(compositionIdentifier, nodeIdentifier);
2555 });
2556 free(compositionIdentifier);
2557 free(nodeIdentifier);
2558 break;
2559 }
2561 {
2562 char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2563 char *nodeIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2564 dispatch_sync(delegateQueue, ^{
2565 if (delegate)
2566 delegate->receivedTelemetryNodeExecutionFinished(compositionIdentifier, nodeIdentifier);
2567 });
2568 free(compositionIdentifier);
2569 free(nodeIdentifier);
2570 break;
2571 }
2573 {
2574 while (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2575 {
2576 char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2577 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2578 {
2579 char *portIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2580 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2581 {
2582 bool receivedEvent = vuoReceiveBool(ZMQTelemetry, NULL);
2583 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2584 {
2585 bool receivedData = vuoReceiveBool(ZMQTelemetry, NULL);
2586 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2587 {
2588 string portDataSummary;
2589 char *s = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2590 if (s)
2591 {
2592 portDataSummary = s;
2593 free(s);
2594 }
2595 else
2596 portDataSummary = "";
2597
2598 dispatch_sync(delegateQueue, ^{
2599 if (delegate)
2600 delegate->receivedTelemetryInputPortUpdated(compositionIdentifier, portIdentifier, receivedEvent, receivedData, portDataSummary);
2601 });
2602 }
2603 }
2604 }
2605 free(portIdentifier);
2606 }
2607 free(compositionIdentifier);
2608 }
2609 break;
2610 }
2612 {
2613 while (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2614 {
2615 char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2616 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2617 {
2618 char *portIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2619 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2620 {
2621 bool sentEvent = vuoReceiveBool(ZMQTelemetry, NULL);
2622 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2623 {
2624 bool sentData = vuoReceiveBool(ZMQTelemetry, NULL);
2625 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2626 {
2627 string portDataSummary;
2628 char *s = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2629 if (s)
2630 {
2631 portDataSummary = s;
2632 free(s);
2633 }
2634 else
2635 portDataSummary = "";
2636
2637 dispatch_sync(delegateQueue, ^{
2638 if (delegate)
2639 delegate->receivedTelemetryOutputPortUpdated(compositionIdentifier, portIdentifier, sentEvent, sentData, portDataSummary);
2640 });
2641 }
2642 }
2643 }
2644 free(portIdentifier);
2645 }
2646 free(compositionIdentifier);
2647 }
2648 break;
2649 }
2651 {
2652 while (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2653 {
2654 char *portIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2655 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2656 {
2657 bool sentData = vuoReceiveBool(ZMQTelemetry, NULL);
2658 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2659 {
2660 string portDataSummary;
2661 char *s = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2662 if (s)
2663 {
2664 portDataSummary = s;
2665 free(s);
2666 }
2667 else
2668 portDataSummary = "";
2669
2670 Port *port = getPublishedOutputPortWithName(portIdentifier);
2671
2672 dispatch_sync(delegateQueue, ^{
2673 if (delegate)
2674 delegate->receivedTelemetryPublishedOutputPortUpdated(port, sentData, portDataSummary);
2675 });
2676 }
2677 }
2678 free(portIdentifier);
2679 }
2680 break;
2681 }
2683 {
2684 saturating_semaphore_signal(lastFiredEventSemaphore, &lastFiredEventSignaled);
2685 break;
2686 }
2688 {
2689 char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2690 char *portIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2691 dispatch_sync(delegateQueue, ^{
2692 if (delegate)
2693 delegate->receivedTelemetryEventDropped(compositionIdentifier, portIdentifier);
2694 });
2695 free(compositionIdentifier);
2696 free(portIdentifier);
2697 break;
2698 }
2699 case VuoTelemetryError:
2700 {
2701 char *message = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2702 dispatch_sync(delegateQueue, ^{
2703 if (delegate)
2704 delegate->receivedTelemetryError( string(message) );
2705 });
2706 free(message);
2707 break;
2708 }
2710 {
2711 dispatch_sync(delegateQueue, ^{
2712 if (delegate)
2713 delegate->lostContactWithComposition();
2714 });
2715 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2716 stop();
2717 });
2718 break;
2719 }
2720 default:
2721 VUserLog("Error: Unknown telemetry message type: %d", type);
2722 break;
2723 }
2724 }
2725 else if (! listenCanceled) // Either the 1-second timeout elapsed, or we got a stop-listening message from ZMQSelfSend
2726 {
2727 if (items[1].revents & ZMQ_POLLIN)
2728 {
2729 // This is a stop-listening message.
2730 vuoReceiveInt(ZMQSelfReceive, NULL);
2731
2732 // Drain any remaining telemetry messages.
2733 pendingCancel = true;
2734 }
2735
2736 else if (pendingCancel)
2737 listenCanceled = true;
2738
2739 else
2740 {
2741 // Timeout.
2742 // Could happen if the composition crashed, or if the system fell asleep then hibernated (standby mode).
2743 // If it's a crash we should disconnect; if it's hibernation we should ignore the timeout and try zmq_poll again.
2745 VDebugLog("zmq_poll timed out, but system is sleeping so I'll try again.");
2746 else if (VuoLog_isDebuggerAttached())
2747 VDebugLog("zmq_poll timed out, but a debugger is attached to the host so I'll try again.");
2748 else
2749 {
2750 listenCanceled = true;
2751 string dir, file, ext;
2752 VuoFileUtilities::splitPath(executablePath, dir, file, ext);
2753 stopBecauseLostContact("The connection between the composition ('" + file + "') and runner timed out while listening for telemetry.");
2754 }
2755 }
2756 }
2757 }
2758
2759 zmq_close(ZMQTelemetry);
2760 ZMQTelemetry = NULL;
2761
2762 zmq_close(ZMQSelfSend);
2763 ZMQSelfSend = NULL;
2764 zmq_close(ZMQSelfReceive);
2765 ZMQSelfReceive = NULL;
2766
2767 dispatch_semaphore_signal(endedListeningSemaphore);
2768 return;
2769}
2770
2780void VuoRunner::vuoControlRequestSend(enum VuoControlRequest request, zmq_msg_t *messages, unsigned int messageCount)
2781{
2782 char *error = NULL;
2783 vuoSend("runner VuoControl",ZMQControl,request,messages,messageCount,false,&error);
2784
2785 if (error)
2786 {
2787 string e(error);
2788 free(error);
2789 throw VuoException(e);
2790 }
2791}
2792
2802void VuoRunner::vuoLoaderControlRequestSend(enum VuoLoaderControlRequest request, zmq_msg_t *messages, unsigned int messageCount)
2803{
2804 char *error = NULL;
2805 vuoSend("runner VuoLoaderControl",ZMQLoaderControl,request,messages,messageCount,false,&error);
2806
2807 if (error)
2808 {
2809 string e(error);
2810 free(error);
2811 throw VuoException(e);
2812 }
2813}
2814
2823void VuoRunner::vuoControlReplyReceive(enum VuoControlReply expectedReply)
2824{
2825 char *error = NULL;
2826 int reply = vuoReceiveInt(ZMQControl, &error);
2827
2828 if (error)
2829 {
2830 string e(error);
2831 free(error);
2832 ostringstream oss;
2833 oss << e << " (expected " << expectedReply << ")";
2834 throw VuoException(oss.str());
2835 }
2836 else if (reply != expectedReply)
2837 {
2838 ostringstream oss;
2839 oss << "The runner received the wrong message from the composition (expected " << expectedReply << ", received " << reply << ")";
2840 throw VuoException(oss.str());
2841 }
2842}
2843
2852void VuoRunner::vuoLoaderControlReplyReceive(enum VuoLoaderControlReply expectedReply)
2853{
2854 char *error = NULL;
2855 int reply = vuoReceiveInt(ZMQLoaderControl, &error);
2856
2857 if (error)
2858 {
2859 string e(error);
2860 free(error);
2861 ostringstream oss;
2862 oss << e << " (expected " << expectedReply << ")";
2863 throw VuoException(oss.str());
2864 }
2865 else if (reply != expectedReply)
2866 {
2867 ostringstream oss;
2868 oss << "The runner received the wrong message from the composition loader (expected " << expectedReply << ", received " << reply << ")";
2869 throw VuoException(oss.str());
2870 }
2871}
2872
2878string VuoRunner::receiveString(string fallbackIfNull)
2879{
2880 char *error = NULL;
2881 char *s = vuoReceiveAndCopyString(ZMQControl, &error);
2882
2883 if (error)
2884 {
2885 string e(error);
2886 free(error);
2887 throw VuoException(e);
2888 }
2889
2890 string ret;
2891 if (s)
2892 {
2893 ret = s;
2894 free(s);
2895 }
2896 else
2897 ret = fallbackIfNull;
2898
2899 return ret;
2900}
2901
2905vector<string> VuoRunner::receiveListOfStrings(void)
2906{
2907 vector<string> messageStrings;
2909 {
2910 string s = receiveString("");
2911 messageStrings.push_back(s);
2912 }
2913 return messageStrings;
2914}
2915
2921void VuoRunner::saturating_semaphore_signal(dispatch_semaphore_t dsema, bool *signaled)
2922{
2923 if (__sync_bool_compare_and_swap(signaled, false, true))
2924 dispatch_semaphore_signal(dsema);
2925}
2926
2932void VuoRunner::saturating_semaphore_wait(dispatch_semaphore_t dsema, bool *signaled)
2933{
2934 *signaled = false;
2935 dispatch_semaphore_wait(dsema, DISPATCH_TIME_FOREVER);
2936}
2937
2942{
2944
2945 return stopped;
2946}
2947
2951bool VuoRunner::isInCurrentProcess(void)
2952{
2953 return executablePath.empty();
2954}
2955
2960bool VuoRunner::isUsingCompositionLoader(void)
2961{
2962 return ! executablePath.empty() && ! dylibPath.empty();
2963}
2964
2969{
2971
2972 dispatch_sync(delegateQueue, ^{
2973 this->delegate = delegate;
2974 });
2975}
2976
2980void VuoRunner::stopBecauseLostContact(string errorMessage)
2981{
2982 __block bool alreadyLostContact;
2983 dispatch_sync(delegateQueue, ^{
2984 alreadyLostContact = lostContact;
2985 lostContact = true;
2986 });
2987
2988 if (alreadyLostContact)
2989 return;
2990
2991 saturating_semaphore_signal(lastFiredEventSemaphore, &lastFiredEventSignaled);
2992
2993 dispatch_sync(delegateQueue, ^{
2994 if (delegate)
2995 delegate->lostContactWithComposition();
2996 });
2997
2998 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2999 stop();
3000 });
3001
3002 if (! isInCurrentProcess())
3003 {
3004 // Normally, stop() is responsible for terminating the ZMQ context.
3005 // But, if stopBecauseLostContact() is called, it takes the responsibility away from stop().
3006 // If there's an in-progress zmq_recv() call, stop() will get stuck waiting on controlQueue, so
3007 // the below call to terminate the ZMQ context interrupts zmq_recv() and allows stop() to proceed.
3008 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
3010
3011 zmq_term(ZMQContext);
3012 ZMQContext = NULL;
3013 dispatch_semaphore_signal(terminatedZMQContextSemaphore);
3014 });
3015 }
3016
3017 VUserLog("%s", errorMessage.c_str());
3018}
3019
3026{
3028
3029 return compositionPid;
3030}
3031
3039VuoRunner::Port::Port(string name, string type, json_object *details)
3040{
3041 this->name = name;
3042 this->type = type;
3043 this->details = details;
3044}
3045
3050{
3051 return name;
3052}
3053
3058{
3059 return type;
3060}
3061
3090{
3091 return details;
3092}
3093
3094VuoRunnerDelegate::~VuoRunnerDelegate() { } // Fixes "undefined symbols" error