Vuo  2.4.0
VuoRunner.cc
Go to the documentation of this file.
1
10#include "VuoRunner.hh"
11#include "VuoFileUtilities.hh"
12#include "VuoStringUtilities.hh"
13#include "VuoEventLoop.h"
14#include "VuoException.hh"
15#include "VuoRuntime.h"
17
18#include <CoreServices/CoreServices.h>
19#include <pthread.h>
20#include <stdio.h>
21#include <dlfcn.h>
22#include <sstream>
23#include <copyfile.h>
24#include <mach-o/loader.h>
25#include <sys/proc_info.h>
26#include <sys/stat.h>
27
28void *VuoApp_mainThread = NULL;
29static const char *mainThreadChecker = "/Applications/Xcode.app/Contents/Developer/usr/lib/libMainThreadChecker.dylib";
31static bool VuoRunner_isHostVDMX = false;
32
37static void VuoRunner_closeOnExec(int fd)
38{
39 int flags = fcntl(fd, F_GETFD);
40 if (flags < 0)
41 {
42 VUserLog("Error: Couldn't get flags for desciptor %d: %s", fd, strerror(errno));
43 return;
44 }
45
46 flags |= FD_CLOEXEC;
47
48 if (fcntl(fd, F_SETFD, flags) != 0)
49 VUserLog("Error: Couldn't set FD_CLOEXEC on descriptor %d: %s", fd, strerror(errno));
50}
51
55static void __attribute__((constructor)) VuoRunner_init()
56{
57 VuoApp_mainThread = (void *)pthread_self();
58
59#pragma clang diagnostic push
60#pragma clang diagnostic ignored "-Wdeprecated-declarations"
61 // Calls _TSGetMainThread().
62 // https://b33p.net/kosada/node/12944
63 YieldToAnyThread();
64#pragma clang diagnostic pop
65
67
68 // Ensure that the write end of this pipe gets closed upon fork()/exec(),
69 // so child processes don't prop open this pipe,
70 // which would prevent Vuo compositions from quitting when the VuoRunner process quits.
72
73 if (VuoStringUtilities::makeFromCFString(CFBundleGetIdentifier(CFBundleGetMainBundle())) == "com.vidvox.VDMX5")
75}
76
80static bool isMainThread(void)
81{
82 return VuoApp_mainThread == (void *)pthread_self();
83}
84
88static void VuoRunner_configureSocket(void *zmqSocket)
89{
90 int linger = 0; // avoid having zmq_term block if the runner has tried to send a message on a broken connection
91 zmq_setsockopt(zmqSocket, ZMQ_LINGER, &linger, sizeof linger);
92}
93
98{
99public:
100 Private() :
101 lastWidth(0),
102 lastHeight(0)
103 {
104 }
105
107 typedef void *(*vuoImageMakeFromJsonWithDimensionsType)(json_object *, unsigned int, unsigned int);
109 typedef json_object *(*vuoImageGetInterprocessJsonType)(void *);
111
112 uint64_t lastWidth;
113 uint64_t lastHeight;
114};
115
126VuoRunner * VuoRunner::newSeparateProcessRunnerFromExecutable(string executablePath, string sourceDir,
127 bool continueIfRunnerDies, bool deleteExecutableWhenFinished)
128{
129 VuoRunner * vr = new VuoRunner();
130 vr->executablePath = executablePath;
131 vr->shouldContinueIfRunnerDies = continueIfRunnerDies;
132 vr->shouldDeleteBinariesWhenFinished = deleteExecutableWhenFinished;
133 vr->sourceDir = sourceDir;
134 return vr;
135}
136
152VuoRunner * VuoRunner::newSeparateProcessRunnerFromDynamicLibrary(string compositionLoaderPath, string compositionDylibPath,
153 const std::shared_ptr<VuoRunningCompositionLibraries> &runningCompositionLibraries,
154 string sourceDir, bool continueIfRunnerDies, bool deleteDylibsWhenFinished)
155{
156 VuoRunner * vr = new VuoRunner();
157 vr->executablePath = compositionLoaderPath;
158 vr->dylibPath = compositionDylibPath;
159 vr->dependencyLibraries = runningCompositionLibraries;
160 vr->sourceDir = sourceDir;
161 vr->shouldContinueIfRunnerDies = continueIfRunnerDies;
162 vr->shouldDeleteBinariesWhenFinished = deleteDylibsWhenFinished;
163 runningCompositionLibraries->setDeleteResourceLibraries(deleteDylibsWhenFinished);
164 return vr;
165}
166
177 bool deleteDylibWhenFinished)
178{
179 VuoRunner * vr = new VuoRunner();
180 vr->dylibPath = dylibPath;
181 vr->shouldDeleteBinariesWhenFinished = deleteDylibWhenFinished;
182 vr->sourceDir = sourceDir;
183 return vr;
184}
185
192{
193 dispatch_release(stoppedSemaphore);
194 dispatch_release(terminatedZMQContextSemaphore);
195 dispatch_release(beganListeningSemaphore);
196 dispatch_release(endedListeningSemaphore);
197 dispatch_release(lastFiredEventSemaphore);
198 dispatch_release(delegateQueue);
199 delete p;
200}
201
206void VuoRunner::setRuntimeChecking(bool runtimeCheckingEnabled)
207{
208 if (!stopped)
209 {
210 VUserLog("Error: Only call VuoRunner::setRuntimeChecking() prior to starting the composition.");
211 return;
212 }
213
214 isRuntimeCheckingEnabled = runtimeCheckingEnabled && VuoFileUtilities::fileExists(mainThreadChecker);
215}
216
220VuoRunner::VuoRunner(void)
221{
222 p = new Private;
223 dylibHandle = NULL;
224 dependencyLibraries = NULL;
225 shouldContinueIfRunnerDies = false;
226 shouldDeleteBinariesWhenFinished = false;
227 isRuntimeCheckingEnabled = false;
228 paused = true;
229 stopped = true;
230 lostContact = false;
231 listenCanceled = false;
232 stoppedSemaphore = dispatch_semaphore_create(1);
233 terminatedZMQContextSemaphore = dispatch_semaphore_create(0);
234 beganListeningSemaphore = dispatch_semaphore_create(0);
235 endedListeningSemaphore = dispatch_semaphore_create(1);
236 lastFiredEventSemaphore = dispatch_semaphore_create(0);
237 lastFiredEventSignaled = false;
238 controlQueue = dispatch_queue_create("org.vuo.runner.control", NULL);
239 ZMQContext = NULL;
240 ZMQSelfSend = NULL;
241 ZMQSelfReceive = NULL;
242 ZMQControl = NULL;
243 ZMQTelemetry = NULL;
244 ZMQLoaderControl = NULL;
245 delegate = NULL;
246 delegateQueue = dispatch_queue_create("org.vuo.runner.delegate", NULL);
247 arePublishedInputPortsCached = false;
248 arePublishedOutputPortsCached = false;
249
250 static once_flag sleepHandlersInstalled;
251 call_once(sleepHandlersInstalled, [](){
253 });
254}
255
272{
273 try
274 {
275 startInternal();
276
277 if (isInCurrentProcess())
278 {
279 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
280 dispatch_async(queue, ^{
281 unpause();
282 });
283 while (paused)
284 {
286 usleep(USEC_PER_SEC / 1000);
287 }
288 }
289 else
290 {
291 unpause();
292 }
293 }
294 catch (VuoException &e)
295 {
296 stopBecauseLostContact(e.what());
297 }
298}
299
319{
320 try
321 {
322 startInternal();
323 }
324 catch (VuoException &e)
325 {
326 stopBecauseLostContact(e.what());
327 }
328}
329
334void VuoRunner::copyDylibAndChangeId(string dylibPath, string &outputDylibPath)
335{
336 string directory, file, extension;
337 VuoFileUtilities::splitPath(dylibPath, directory, file, extension);
338
339 const int makeTmpFileExtension = 7;
340 if (file.length() > makeTmpFileExtension)
341 {
342 // makeTmpFile() appends "-XXXXXX"; make room for that.
343 string trimmedFile = file.substr(0, file.length() - makeTmpFileExtension);
344
345 bool alreadyLoaded;
346 do
347 {
348 outputDylibPath = VuoFileUtilities::makeTmpFile(trimmedFile, "dylib");
349 alreadyLoaded = dlopen(outputDylibPath.c_str(), RTLD_NOLOAD);
350 } while (alreadyLoaded);
351 }
352 else
353 {
354 // For short names, like those generated by VDMX, just replace the entire name with a hash.
355 // https://b33p.net/kosada/node/12917
356 bool alreadyLoaded;
357 do
358 {
359 string hash = VuoStringUtilities::makeRandomHash(file.length());
360 outputDylibPath = "/tmp/" + hash + ".dylib";
361 alreadyLoaded = dlopen(outputDylibPath.c_str(), RTLD_NOLOAD);
362 } while (alreadyLoaded);
363 }
364
365 string newDirectory, newFile, newExtension;
366 VuoFileUtilities::splitPath(outputDylibPath, newDirectory, newFile, newExtension);
367
368 if (newFile.length() > file.length())
369 throw VuoException("The composition couldn't start because the uniqued dylib name (" + newFile + ") is longer than the original dylib name (" + file + ").");
370
371 if (copyfile(dylibPath.c_str(), outputDylibPath.c_str(), NULL, COPYFILE_ALL))
372 throw VuoException("The composition couldn't start because a copy of the dylib couldn't be made.");
373
374 FILE *fp = fopen(outputDylibPath.c_str(), "r+b");
375 if (!fp)
376 throw VuoException("The composition couldn't start because the dylib's header couldn't be opened.");
377 VuoDefer(^{ fclose(fp); });
378
379 struct mach_header_64 header;
380 if (fread(&header, sizeof(header), 1, fp) != 1)
381 throw VuoException("The composition couldn't start because the dylib's header couldn't be read.");
382
383 if (header.magic != MH_MAGIC_64
384 || header.cputype != CPU_TYPE_X86_64)
385 throw VuoException("The composition couldn't start because the dylib isn't an x86_64-only (non-fat) Mach-O binary.");
386
387 for (int i = 0; i < header.ncmds; ++i)
388 {
389 struct load_command lc;
390 if (fread(&lc, sizeof(lc), 1, fp) != 1)
391 throw VuoException("The composition couldn't start because the dylib's command couldn't be read.");
392
393 // VLog("cmd[%d]: %x (size %d)",i,lc.cmd,lc.cmdsize);
394 if (lc.cmd == LC_ID_DYLIB)
395 {
396 fseek(fp, sizeof(struct dylib), SEEK_CUR);
397
398 size_t nameLength = lc.cmdsize - sizeof(struct dylib_command);
399 char *name = (char *)calloc(nameLength + 1, 1);
400 if (fread(name, nameLength, 1, fp) != 1)
401 throw VuoException("The composition couldn't start because the dylib's ID command couldn't be read.");
402
403// VLog("Changing name \"%s\" to \"%s\"…", name, outputDylibPath.c_str());
404 fseek(fp, -nameLength, SEEK_CUR);
405 bzero(name, nameLength);
406 memcpy(name, outputDylibPath.c_str(), min(nameLength, outputDylibPath.length()));
407 fwrite(name, nameLength, 1, fp);
408 return;
409 }
410 else
411 fseek(fp, lc.cmdsize-sizeof(lc), SEEK_CUR);
412 }
413
414 throw VuoException("The composition couldn't start because the dylib's LC_ID_DYLIB command couldn't be found.");
415}
416
426void VuoRunner::startInternal(void)
427{
428 stopped = false;
429 dispatch_semaphore_wait(stoppedSemaphore, DISPATCH_TIME_FOREVER);
430
431 ZMQContext = zmq_init(1);
432
433 if (isInCurrentProcess())
434 {
435 // Start the composition in the current process.
436
437 bool alreadyLoaded = dlopen(dylibPath.c_str(), RTLD_NOLOAD);
438 if (alreadyLoaded)
439 {
440 // Each composition instance needs its own global variables.
441 // Change the dylib's internal name, to convince dlopen() to load another instance of it.
442
443 string uniquedDylibPath;
444 copyDylibAndChangeId(dylibPath, uniquedDylibPath);
445 VDebugLog("\"%s\" is already loaded, so I duplicated it and changed its LC_ID_DYLIB to \"%s\".", dylibPath.c_str(), uniquedDylibPath.c_str());
446
447 if (shouldDeleteBinariesWhenFinished)
448 remove(dylibPath.c_str());
449
450 dylibPath = uniquedDylibPath;
451 shouldDeleteBinariesWhenFinished = true;
452 }
453
454 dylibHandle = dlopen(dylibPath.c_str(), RTLD_NOW);
455 if (!dylibHandle)
456 throw VuoException("The composition couldn't start because the library '" + dylibPath + "' couldn't be loaded : " + dlerror());
457
458 try
459 {
461 if (! vuoInitInProcess)
462 throw VuoException("The composition couldn't start because vuoInitInProcess() couldn't be found in '" + dylibPath + "' : " + dlerror());
463
464 ZMQControlURL = "inproc://" + VuoFileUtilities::makeTmpFile("vuo-control", "");
465 ZMQTelemetryURL = "inproc://" + VuoFileUtilities::makeTmpFile("vuo-telemetry", "");
466
467 vuoInitInProcess(ZMQContext, ZMQControlURL.c_str(), ZMQTelemetryURL.c_str(), true, getpid(), -1, false,
468 sourceDir.c_str(), dylibHandle, NULL, false);
469 }
470 catch (VuoException &e)
471 {
472 VUserLog("error: %s", e.what());
473 dlclose(dylibHandle);
474 dylibHandle = NULL;
475 throw;
476 }
477 }
478 else
479 {
480 // Start the composition or composition loader in a new process.
481
482 vector<string> args;
483
484 string executableName;
485 if (isUsingCompositionLoader())
486 {
487 // If we're using the loader, set the executable's display name to the dylib,
488 // so that composition's name shows up in the process list.
489 string dir, file, ext;
490 VuoFileUtilities::splitPath(dylibPath, dir, file, ext);
491 executableName = file;
492 }
493 else
494 {
495 string dir, file, ext;
496 VuoFileUtilities::splitPath(executablePath, dir, file, ext);
497 string executableName = file;
498 if (! ext.empty())
499 executableName += "." + ext;
500 }
501 args.push_back(executableName);
502
503 // https://b33p.net/kosada/node/16374
504 // The socket's full pathname (`sockaddr_un::sun_path`) must be 104 characters or less
505 // (https://opensource.apple.com/source/xnu/xnu-2782.1.97/bsd/sys/un.h.auto.html).
506 // "/Users/me/Library/Containers/com.apple.ScreenSaver.Engine.legacyScreenSaver/Data/vuo-telemetry-rr8Br3"
507 // is 101 characters, which limits the username to 5 characters.
508 // "/Users/me/Library/Containers/com.apple.ScreenSaver.Engine.legacyScreenSaver/Data/v-rr8Br3"
509 // is 89 characters, which limits the username to 17 characters
510 // (still not a lot, but more likely to work with typical macOS usernames).
511 ZMQControlURL = "ipc://" + VuoFileUtilities::makeTmpFile("v", "");
512 ZMQTelemetryURL = "ipc://" + VuoFileUtilities::makeTmpFile("v", "");
513 args.push_back("--vuo-control=" + ZMQControlURL);
514 args.push_back("--vuo-telemetry=" + ZMQTelemetryURL);
515
516 {
517 ostringstream oss;
518 oss << getpid();
519 args.push_back("--vuo-runner-pid=" + oss.str());
520 }
521
522 {
523 ostringstream oss;
525 args.push_back("--vuo-runner-pipe=" + oss.str());
526 }
527
528 if (shouldContinueIfRunnerDies)
529 args.push_back("--vuo-continue-if-runner-dies");
530
531 if (isUsingCompositionLoader())
532 {
533 ZMQLoaderControlURL = "ipc://" + VuoFileUtilities::makeTmpFile("v", "");
534 args.push_back("--vuo-loader=" + ZMQLoaderControlURL);
535 }
536 else
537 args.push_back("--vuo-pause");
538
539 int fd[2];
540 int ret = pipe(fd);
541 if (ret)
542 throw VuoException("The composition couldn't start because a pipe couldn't be opened : " + string(strerror(errno)));
543
544 int argSize = args.size();
545 char *argv[argSize + 1];
546 for (size_t i = 0; i < argSize; ++i)
547 {
548 size_t mallocSize = args[i].length() + 1;
549 argv[i] = (char *)malloc(mallocSize);
550 strlcpy(argv[i], args[i].c_str(), mallocSize);
551 }
552 argv[argSize] = NULL;
553
554 string errorWorkingDirectory = "The composition couldn't start because the working directory couldn't be changed to '" + sourceDir + "' : ";
555 string errorExecutable = "The composition couldn't start because the file '" + executablePath + "' couldn't be executed : ";
556 string errorFork = "The composition couldn't start because the composition process couldn't be forked : ";
557 const size_t ERROR_BUFFER_LEN = 256;
558 char errorBuffer[ERROR_BUFFER_LEN];
559
560 pipe(runnerReadCompositionWritePipe);
561
562 pid_t childPid = fork();
563 if (childPid == 0)
564 {
565 // There are only a limited set of functions you're allowed to call in the child process
566 // after fork() and before exec(). Functions such as VUserLog() and exit() aren't allowed,
567 // so instead we're calling alternatives such as write() and _exit().
568
569 close(runnerReadCompositionWritePipe[0]);
570
571 pid_t grandchildPid = fork();
572 if (grandchildPid == 0)
573 {
574 close(fd[0]);
575 close(fd[1]);
576
577 // Set the current working directory to that of the source .vuo composition so that
578 // relative URL paths are resolved correctly.
579 if (!sourceDir.empty())
580 {
581 ret = chdir(sourceDir.c_str());
582 if (ret)
583 {
584 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
585 write(STDERR_FILENO, errorWorkingDirectory.c_str(), errorWorkingDirectory.length());
586 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
587 write(STDERR_FILENO, "\n", 1);
588 _exit(-1);
589 }
590 }
591
592 if (isRuntimeCheckingEnabled)
593 {
594 const char *vuoRuntimeChecking = getenv("VUO_RUNTIME_CHECKING");
595 if (vuoRuntimeChecking)
596 setenv("DYLD_INSERT_LIBRARIES", vuoRuntimeChecking, 1);
597 else
598 setenv("DYLD_INSERT_LIBRARIES", mainThreadChecker, 1);
599 }
600
601 ret = execv(executablePath.c_str(), argv);
602 if (ret)
603 {
604 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
605 write(STDERR_FILENO, errorExecutable.c_str(), errorExecutable.length());
606 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
607 write(STDERR_FILENO, "\n", 1);
608 for (size_t i = 0; i < argSize; ++i)
609 free(argv[i]);
610 _exit(-1);
611 }
612 }
613 else if (grandchildPid > 0)
614 {
615 close(fd[0]);
616
617 int ret = write(fd[1], &grandchildPid, sizeof(pid_t));
618 if (ret != sizeof(pid_t))
619 {
620 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
621 write(STDERR_FILENO, errorExecutable.c_str(), errorExecutable.length());
622 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
623 write(STDERR_FILENO, "\n", 1);
624 }
625 close(fd[1]);
626
627 _exit(0);
628 }
629 else
630 {
631 close(fd[0]);
632 close(fd[1]);
633
634 strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
635 write(STDERR_FILENO, errorFork.c_str(), errorFork.length());
636 write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
637 write(STDERR_FILENO, "\n", 1);
638 _exit(-1);
639 }
640 }
641 else if (childPid > 0)
642 {
643 close(fd[1]);
644
645 // If this process launches compositions in addition to this one,
646 // ensure they don't prop open this pipe,
647 // which would prevent VuoRunner::stop's `read()` from terminating.
648 VuoRunner_closeOnExec(runnerReadCompositionWritePipe[1]);
649
650 for (size_t i = 0; i < argSize; ++i)
651 free(argv[i]);
652
653 pid_t grandchildPid = 0;
654 int ret = read(fd[0], &grandchildPid, sizeof(pid_t));
655 if (ret != sizeof(pid_t))
656 throw VuoException("The composition couldn't start because the composition process id couldn't be obtained: " + string(strerror(errno)));
657 close(fd[0]);
658
659 // Reap the child process.
660 int status;
661 do {
662 ret = waitpid(childPid, &status, 0);
663 } while (ret == -1 && errno == EINTR);
664 if (WIFEXITED(status) && WEXITSTATUS(status))
665 throw VuoException("The composition couldn't start because the parent of the composition process exited with an error.");
666 else if (WIFSIGNALED(status))
667 throw VuoException("The composition couldn't start because the parent of the composition process exited abnormally : " + string(strsignal(WTERMSIG(status))));
668
669 if (grandchildPid > 0)
670 compositionPid = grandchildPid;
671 else
672 throw VuoException("The composition couldn't start because the composition process id couldn't be obtained");
673 }
674 else
675 {
676 for (size_t i = 0; i < argSize; ++i)
677 free(argv[i]);
678
679 throw VuoException("The composition couldn't start because the parent of the composition process couldn't be forked : " + string(strerror(errno)));
680 }
681 }
682
683 // Connect to the composition loader (if any) and composition.
684 if (isUsingCompositionLoader())
685 {
686 ZMQLoaderControl = zmq_socket(ZMQContext,ZMQ_REQ);
688
689 // Try to connect to the composition loader. If at first we don't succeed, give the composition loader a little more time to set up the socket.
690 int numTries = 0;
691 while (zmq_connect(ZMQLoaderControl,ZMQLoaderControlURL.c_str()))
692 {
693 if (++numTries == 1000)
694 throw VuoException("The composition couldn't start because the runner couldn't establish communication with the composition loader : " + string(strerror(errno)));
695 usleep(USEC_PER_SEC / 1000);
696 }
697
698 // Actually start the composition.
699 // Since we don't know how long this will take, the socket has an infinite timeout (https://b33p.net/kosada/vuo/vuo/-/issues/18450).
700 replaceComposition(dylibPath, "");
701 }
702 else
703 {
704 __block string errorMessage;
705 dispatch_sync(controlQueue, ^{
706 try {
707 setUpConnections();
708 } catch (VuoException &e) {
709 errorMessage = e.what();
710 }
711 });
712 if (! errorMessage.empty())
713 throw VuoException(errorMessage);
714 }
715}
716
721void *VuoRunner_listen(void *context)
722{
723 pthread_detach(pthread_self());
724 VuoRunner *runner = static_cast<VuoRunner *>(context);
725 runner->listen();
726 return NULL;
727}
728
734void VuoRunner::setUpConnections(void)
735{
736 ZMQControl = zmq_socket(ZMQContext,ZMQ_REQ);
738
739 // Try to connect to the composition. If at first we don't succeed, give the composition a little more time to set up the socket.
740 int numTries = 0;
741 while (zmq_connect(ZMQControl,ZMQControlURL.c_str()))
742 {
743 if (++numTries == 1000)
744 throw VuoException("The composition couldn't start because the runner couldn't establish communication to control the composition : " + string(strerror(errno)));
745 usleep(USEC_PER_SEC / 1000);
746 }
747
748 // Cache published ports so they're available whenever a caller starts listening for published port value changes.
749 arePublishedInputPortsCached = false;
750 arePublishedOutputPortsCached = false;
751 if (isInCurrentProcess())
752 {
753 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
754 __block string publishedPortsError;
755 dispatch_async(queue, ^{
756 try {
757 getCachedPublishedPorts(false);
758 getCachedPublishedPorts(true);
759 } catch (VuoException &e) {
760 publishedPortsError = e.what();
761 }
762 });
763 while (! (arePublishedInputPortsCached && arePublishedOutputPortsCached) )
764 {
766 usleep(USEC_PER_SEC / 1000);
767
768 if (! publishedPortsError.empty())
769 throw VuoException(publishedPortsError);
770 }
771 }
772 else
773 {
774 getCachedPublishedPorts(false);
775 getCachedPublishedPorts(true);
776 }
777
778 listenCanceled = false;
779 dispatch_semaphore_wait(endedListeningSemaphore, DISPATCH_TIME_FOREVER);
780
781 pthread_t listenThread;
782 int ret = pthread_create(&listenThread, nullptr, &VuoRunner_listen, this);
783 if (ret)
784 throw VuoException(string("The composition couldn't start because the runner couldn't create a thread: ") + strerror(ret));
785
786 dispatch_semaphore_wait(beganListeningSemaphore, DISPATCH_TIME_FOREVER);
787 if (!listenError.empty())
788 throw VuoException("The composition couldn't start because the runner couldn't establish communication to listen to the composition: " + listenError);
789
792}
793
809{
810 if (! isInCurrentProcess())
811 throw VuoException("The composition is not running in the current process. Only use this function if the composition was constructed with newCurrentProcessRunnerFromDynamicLibrary().");
812
813 if (! isMainThread())
814 throw VuoException("This is not the main thread. Only call this function from the main thread.");
815
816 while (! stopped)
817 VuoEventLoop_processEvent(VuoEventLoop_WaitIndefinitely);
818}
819
845{
846 if (! isInCurrentProcess())
847 throw VuoException("The composition is not running in the current process. Only use this function if the composition was constructed with newCurrentProcessRunnerFromDynamicLibrary().");
848
849 if (! isMainThread())
850 throw VuoException("This is not the main thread. Only call this function from the main thread.");
851
852 VuoEventLoop_processEvent(VuoEventLoop_RunOnce);
853}
854
863{
864 dispatch_sync(controlQueue, ^{
865 if (stopped || lostContact) {
866 return;
867 }
868
870
871 try
872 {
875 }
876 catch (VuoException &e)
877 {
878 stopBecauseLostContact(e.what());
879 }
880
881 paused = true;
882 });
883}
884
891{
892 dispatch_sync(controlQueue, ^{
893 if (stopped || lostContact) {
894 return;
895 }
896
898
899 try
900 {
903 }
904 catch (VuoException &e)
905 {
906 stopBecauseLostContact(e.what());
907 }
908
909 paused = false;
910 });
911}
912
932void VuoRunner::replaceComposition(string compositionDylibPath, string compositionDiff)
933{
934 if (! isUsingCompositionLoader())
935 throw VuoException("The runner is not using a composition loader. Only use this function if the composition was constructed with newSeparateProcessRunnerFromDynamicLibrary().");
936
937 dispatch_sync(controlQueue, ^{
938 if (stopped || lostContact) {
939 return;
940 }
941
942 VUserLog("Loading composition…");
943
944 if (dylibPath != compositionDylibPath)
945 {
946 if (shouldDeleteBinariesWhenFinished)
947 {
948 remove(dylibPath.c_str());
949 }
950
951 dylibPath = compositionDylibPath;
952 }
953
955
956 try
957 {
958 if (! paused)
959 {
960 VUserLog(" Pausing…");
963 }
964
965 cleanUpConnections();
966
967 vector<string> dependencyDylibPathsRemoved = dependencyLibraries->dequeueLibrariesToUnload();
968 vector<string> dependencyDylibPathsAdded = dependencyLibraries->dequeueLibrariesToLoad();
969
970 unsigned int messageCount = 4 + dependencyDylibPathsAdded.size() + dependencyDylibPathsRemoved.size();
971 zmq_msg_t *messages = (zmq_msg_t *)malloc(messageCount * sizeof(zmq_msg_t));
972 int index = 0;
973
974 vuoInitMessageWithString(&messages[index++], dylibPath.c_str());
975
976 vuoInitMessageWithInt(&messages[index++], dependencyDylibPathsAdded.size());
977 for (vector<string>::iterator i = dependencyDylibPathsAdded.begin(); i != dependencyDylibPathsAdded.end(); ++i) {
978 vuoInitMessageWithString(&messages[index++], (*i).c_str());
979 }
980
981 vuoInitMessageWithInt(&messages[index++], dependencyDylibPathsRemoved.size());
982 for (vector<string>::iterator i = dependencyDylibPathsRemoved.begin(); i != dependencyDylibPathsRemoved.end(); ++i) {
983 vuoInitMessageWithString(&messages[index++], (*i).c_str());
984 }
985
986 vuoInitMessageWithString(&messages[index], compositionDiff.c_str());
987
988 if (! paused)
989 VUserLog(" Replacing composition…");
990
991 vuoLoaderControlRequestSend(VuoLoaderControlRequestCompositionReplace,messages,messageCount);
992 vuoLoaderControlReplyReceive(VuoLoaderControlReplyCompositionReplaced);
993
994 setUpConnections();
995
996 if (! paused)
997 {
998 VUserLog(" Unpausing…");
1001 }
1002
1003 VUserLog(" Done.");
1004 }
1005 catch (VuoException &e)
1006 {
1007 stopBecauseLostContact(e.what());
1008 }
1009 });
1010}
1011
1026{
1027 dispatch_sync(controlQueue, ^{
1028 if (stopped) {
1029 return;
1030 }
1031
1033
1034 // Only tell the composition to stop if it hasn't already ended on its own.
1035 if (! lostContact)
1036 {
1037 try
1038 {
1039 int timeoutInSeconds = (isInCurrentProcess() ? -1 : 5);
1040 zmq_msg_t messages[3];
1041 vuoInitMessageWithInt(&messages[0], timeoutInSeconds);
1042 vuoInitMessageWithBool(&messages[1], false); // isBeingReplaced
1043 vuoInitMessageWithBool(&messages[2], !isInCurrentProcess()); // isLastEverInProcess
1045
1046 if (isInCurrentProcess() && isMainThread())
1047 {
1048 // If VuoRunner::stop() is blocking the main thread, wait for the composition's reply on another thread, and drain the main queue, in case the composition needs to shut down stuff that requires the main queue.
1049 __block bool replyReceived = false;
1050 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
1052 try
1053 {
1055 }
1056 catch (...)
1057 {
1058 // do nothing; doesn't matter if connection timed out
1059 }
1060 replyReceived = true;
1061 });
1062 while (!replyReceived)
1063 {
1065 usleep(USEC_PER_SEC / 1000);
1066 }
1068 }
1069 else
1071 }
1072 catch (...)
1073 {
1074 // do nothing; doesn't matter if connection timed out
1075 }
1076 }
1077
1078 cleanUpConnections();
1079
1080 if (isUsingCompositionLoader() && ZMQLoaderControl)
1081 {
1082 zmq_close(ZMQLoaderControl);
1083 ZMQLoaderControl = NULL;
1084 }
1085
1086 if (isInCurrentProcess() && dylibHandle)
1087 {
1089 if (vuoInitInProcess) // Avoid double jeopardy if startInternal() already failed for missing vuoInitInProcess.
1090 {
1091 VuoFiniType *vuoFini = (VuoFiniType *)dlsym(dylibHandle, "vuoFini");
1092 if (! vuoFini)
1093 {
1094 VUserLog("The composition couldn't stop because vuoFini() couldn't be found in '%s' : %s", dylibPath.c_str(), dlerror());
1095 return;
1096 }
1097 void *runtimeState = vuoFini();
1098
1100 if (! vuoFiniRuntimeState)
1101 {
1102 VUserLog("The composition couldn't stop because vuoFiniRuntimeState() couldn't be found in '%s' : %s", dylibPath.c_str(), dlerror());
1103 return;
1104 }
1106 }
1107
1108 dlclose(dylibHandle);
1109 dylibHandle = NULL;
1110 }
1111 else if (isInCurrentProcess() && !dylibHandle)
1112 {
1113 // If the dylib isn't open, the composition isn't running, so there's nothing to clean up.
1114 }
1115 else
1116 {
1117 char buf[1];
1118 close(runnerReadCompositionWritePipe[1]);
1119
1120 if (! lostContact)
1121 {
1122 // Wait for child process to end.
1123 // Can't use waitpid() since it only waits on child processes, yet compositionPid is a grandchild.
1124 // Instead, do a blocking read() — the grandchild never writes anything to the pipe, and when the grandchild exits,
1125 // read() will return EOF (since it was the last process that had it open for writing).
1126 read(runnerReadCompositionWritePipe[0], &buf, 1);
1127 }
1128
1129 close(runnerReadCompositionWritePipe[0]);
1130
1131 if (! lostContact)
1132 {
1133 zmq_term(ZMQContext);
1134 ZMQContext = NULL;
1135 }
1136 else
1137 {
1138 dispatch_semaphore_wait(terminatedZMQContextSemaphore, DISPATCH_TIME_FOREVER);
1139 }
1140 }
1141
1142 if (shouldDeleteBinariesWhenFinished)
1143 {
1144 if (isUsingCompositionLoader())
1145 {
1146 remove(dylibPath.c_str());
1147 }
1148 else if (isInCurrentProcess())
1149 {
1150 remove(dylibPath.c_str());
1151 }
1152 else
1153 {
1154 remove(executablePath.c_str());
1155 }
1156 }
1157
1158 dependencyLibraries = nullptr; // release shared_ptr
1159
1160 stopped = true;
1161 dispatch_semaphore_signal(stoppedSemaphore);
1163 });
1164}
1165
1169void VuoRunner::cleanUpConnections(void)
1170{
1171 if (! ZMQControl)
1172 return;
1173
1174 zmq_close(ZMQControl);
1175 ZMQControl = NULL;
1176
1177 if (ZMQSelfSend)
1178 // Break out of zmq_poll().
1179 vuoSend("VuoRunner::ZMQSelfSend", ZMQSelfSend, 0, nullptr, 0, false, nullptr);
1180
1181 dispatch_semaphore_wait(endedListeningSemaphore, DISPATCH_TIME_FOREVER);
1182 dispatch_semaphore_signal(endedListeningSemaphore);
1183}
1184
1191{
1192 dispatch_retain(stoppedSemaphore);
1193 dispatch_semaphore_wait(stoppedSemaphore, DISPATCH_TIME_FOREVER);
1194 dispatch_semaphore_signal(stoppedSemaphore);
1195 dispatch_release(stoppedSemaphore);
1196}
1197
1213void VuoRunner::setInputPortValue(string compositionIdentifier, string portIdentifier, json_object *value)
1214{
1215 const char *valueAsString = json_object_to_json_string_ext(value, JSON_C_TO_STRING_PLAIN);
1216
1217 dispatch_sync(controlQueue, ^{
1218 if (stopped || lostContact) {
1219 return;
1220 }
1221
1223
1224 try
1225 {
1226 zmq_msg_t messages[3];
1227 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1228 vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1229 vuoInitMessageWithString(&messages[2], valueAsString);
1232 }
1233 catch (VuoException &e)
1234 {
1235 stopBecauseLostContact(e.what());
1236 }
1237 });
1238}
1239
1253void VuoRunner::fireTriggerPortEvent(string compositionIdentifier, string portIdentifier)
1254{
1255 dispatch_sync(controlQueue, ^{
1256 if (stopped || lostContact) {
1257 return;
1258 }
1259
1261
1262 try
1263 {
1264 zmq_msg_t messages[2];
1265 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1266 vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1269 }
1270 catch (VuoException &e)
1271 {
1272 stopBecauseLostContact(e.what());
1273 }
1274 });
1275}
1276
1290json_object * VuoRunner::getInputPortValue(string compositionIdentifier, string portIdentifier)
1291{
1292 __block string valueAsString;
1293 dispatch_sync(controlQueue, ^{
1294 if (stopped || lostContact) {
1295 return;
1296 }
1297
1299
1300 try
1301 {
1302 zmq_msg_t messages[3];
1303 vuoInitMessageWithBool(&messages[0], !isInCurrentProcess());
1304 vuoInitMessageWithString(&messages[1], compositionIdentifier.c_str());
1305 vuoInitMessageWithString(&messages[2], portIdentifier.c_str());
1308 valueAsString = receiveString("null");
1309 }
1310 catch (VuoException &e)
1311 {
1312 stopBecauseLostContact(e.what());
1313 }
1314 });
1315 return json_tokener_parse(valueAsString.c_str());
1316}
1317
1331json_object * VuoRunner::getOutputPortValue(string compositionIdentifier, string portIdentifier)
1332{
1333 __block string valueAsString;
1334 dispatch_sync(controlQueue, ^{
1335 if (stopped || lostContact) {
1336 return;
1337 }
1338
1340
1341 try
1342 {
1343 zmq_msg_t messages[3];
1344 vuoInitMessageWithBool(&messages[0], !isInCurrentProcess());
1345 vuoInitMessageWithString(&messages[1], compositionIdentifier.c_str());
1346 vuoInitMessageWithString(&messages[2], portIdentifier.c_str());
1349 valueAsString = receiveString("null");
1350 }
1351 catch (VuoException &e)
1352 {
1353 stopBecauseLostContact(e.what());
1354 }
1355 });
1356 return json_tokener_parse(valueAsString.c_str());
1357}
1358
1372string VuoRunner::getInputPortSummary(string compositionIdentifier, string portIdentifier)
1373{
1374 __block string summary;
1375 dispatch_sync(controlQueue, ^{
1376 if (stopped || lostContact) {
1377 return;
1378 }
1379
1381
1382 try
1383 {
1384 zmq_msg_t messages[2];
1385 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1386 vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1389 summary = receiveString("");
1390 }
1391 catch (VuoException &e)
1392 {
1393 stopBecauseLostContact(e.what());
1394 }
1395 });
1396 return summary;
1397}
1398
1412string VuoRunner::getOutputPortSummary(string compositionIdentifier, string portIdentifier)
1413{
1414 __block string summary;
1415 dispatch_sync(controlQueue, ^{
1416 if (stopped || lostContact) {
1417 return;
1418 }
1419
1421
1422 try
1423 {
1424 zmq_msg_t messages[2];
1425 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1426 vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1429 summary = receiveString("");
1430 }
1431 catch (VuoException &e)
1432 {
1433 stopBecauseLostContact(e.what());
1434 }
1435 });
1436 return summary;
1437}
1438
1452string VuoRunner::subscribeToInputPortTelemetry(string compositionIdentifier, string portIdentifier)
1453{
1454 __block string summary;
1455 dispatch_sync(controlQueue, ^{
1456 if (stopped || lostContact) {
1457 return;
1458 }
1459
1461
1462 try
1463 {
1464 zmq_msg_t messages[2];
1465 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1466 vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1469 summary = receiveString("");
1470 }
1471 catch (VuoException &e)
1472 {
1473 stopBecauseLostContact(e.what());
1474 }
1475 });
1476 return summary;
1477}
1478
1492string VuoRunner::subscribeToOutputPortTelemetry(string compositionIdentifier, string portIdentifier)
1493{
1494 __block string summary;
1495 dispatch_sync(controlQueue, ^{
1496 if (stopped || lostContact) {
1497 return;
1498 }
1499
1501
1502 try
1503 {
1504 zmq_msg_t messages[2];
1505 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1506 vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1509 summary = receiveString("");
1510 }
1511 catch (VuoException &e)
1512 {
1513 stopBecauseLostContact(e.what());
1514 }
1515 });
1516 return summary;
1517}
1518
1529void VuoRunner::unsubscribeFromInputPortTelemetry(string compositionIdentifier, string portIdentifier)
1530{
1531 dispatch_sync(controlQueue, ^{
1532 if (stopped || lostContact) {
1533 return;
1534 }
1535
1537
1538 try
1539 {
1540 zmq_msg_t messages[2];
1541 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1542 vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1545 }
1546 catch (VuoException &e)
1547 {
1548 stopBecauseLostContact(e.what());
1549 }
1550 });
1551}
1552
1563void VuoRunner::unsubscribeFromOutputPortTelemetry(string compositionIdentifier, string portIdentifier)
1564{
1565 dispatch_sync(controlQueue, ^{
1566 if (stopped || lostContact) {
1567 return;
1568 }
1569
1571
1572 try
1573 {
1574 zmq_msg_t messages[2];
1575 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1576 vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1579 }
1580 catch (VuoException &e)
1581 {
1582 stopBecauseLostContact(e.what());
1583 }
1584 });
1585}
1586
1597void VuoRunner::subscribeToEventTelemetry(string compositionIdentifier)
1598{
1599 dispatch_sync(controlQueue, ^{
1600 if (stopped || lostContact) {
1601 return;
1602 }
1603
1605
1606 try
1607 {
1608 zmq_msg_t messages[1];
1609 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1612 }
1613 catch (VuoException &e)
1614 {
1615 stopBecauseLostContact(e.what());
1616 }
1617 });
1618}
1619
1632void VuoRunner::unsubscribeFromEventTelemetry(string compositionIdentifier)
1633{
1634 dispatch_sync(controlQueue, ^{
1635 if (stopped || lostContact) {
1636 return;
1637 }
1638
1640
1641 try
1642 {
1643 zmq_msg_t messages[1];
1644 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1647 }
1648 catch (VuoException &e)
1649 {
1650 stopBecauseLostContact(e.what());
1651 }
1652 });
1653}
1654
1665void VuoRunner::subscribeToAllTelemetry(string compositionIdentifier)
1666{
1667 dispatch_sync(controlQueue, ^{
1668 if (stopped || lostContact) {
1669 return;
1670 }
1671
1673
1674 try
1675 {
1676 zmq_msg_t messages[1];
1677 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1680 }
1681 catch (VuoException &e)
1682 {
1683 stopBecauseLostContact(e.what());
1684 }
1685 });
1686}
1687
1700void VuoRunner::unsubscribeFromAllTelemetry(string compositionIdentifier)
1701{
1702 dispatch_sync(controlQueue, ^{
1703 if (stopped || lostContact) {
1704 return;
1705 }
1706
1708
1709 try
1710 {
1711 zmq_msg_t messages[1];
1712 vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1715 }
1716 catch (VuoException &e)
1717 {
1718 stopBecauseLostContact(e.what());
1719 }
1720 });
1721}
1722
1736void VuoRunner::setPublishedInputPortValues(map<Port *, json_object *> portsAndValuesToSet)
1737{
1739 for (auto i : portsAndValuesToSet)
1740 {
1741 string portName = i.first->getName();
1742 if (portName == "width")
1743 p->lastWidth = json_object_get_int64(i.second);
1744 else if (portName == "height")
1745 p->lastHeight = json_object_get_int64(i.second);
1746 else if (portName == "image" || portName == "startImage")
1747 {
1748 json_object *o;
1749 if (json_object_object_get_ex(i.second, "pixelsWide", &o))
1750 p->lastWidth = json_object_get_int64(o);
1751 if (json_object_object_get_ex(i.second, "pixelsHigh", &o))
1752 p->lastHeight = json_object_get_int64(o);
1753 }
1754 }
1755
1756 dispatch_sync(controlQueue, ^{
1757 if (stopped || lostContact) {
1758 return;
1759 }
1760
1762
1763 try
1764 {
1765 int messageCount = portsAndValuesToSet.size() * 2;
1766 zmq_msg_t messages[messageCount];
1767
1768 int i = 0;
1769 for (auto &kv : portsAndValuesToSet)
1770 {
1771 vuoInitMessageWithString(&messages[i++], kv.first->getName().c_str());
1772 vuoInitMessageWithString(&messages[i++], json_object_to_json_string_ext(kv.second, JSON_C_TO_STRING_PLAIN));
1773 }
1774
1777 }
1778 catch (VuoException &e)
1779 {
1780 stopBecauseLostContact(e.what());
1781 }
1782 });
1783}
1784
1793{
1794 set<VuoRunner::Port *> portAsSet;
1795 portAsSet.insert(port);
1796 firePublishedInputPortEvent(portAsSet);
1797}
1798
1809void VuoRunner::firePublishedInputPortEvent(const set<Port *> &ports)
1810{
1811 dispatch_sync(controlQueue, ^{
1812 if (stopped || lostContact) {
1813 return;
1814 }
1815
1817
1818 lastFiredEventSignaled = false;
1819
1820 try
1821 {
1822 size_t messageCount = ports.size() + 1;
1823 zmq_msg_t messages[messageCount];
1824
1825 vuoInitMessageWithInt(&messages[0], ports.size());
1826 int i = 1;
1827 for (VuoRunner::Port *port : ports) {
1828 vuoInitMessageWithString(&messages[i++], port->getName().c_str());
1829 }
1830
1833 }
1834 catch (VuoException &e)
1835 {
1836 stopBecauseLostContact(e.what());
1837 }
1838 });
1839}
1840
1866{
1867 saturating_semaphore_wait(lastFiredEventSemaphore, &lastFiredEventSignaled);
1868}
1869
1881{
1882 __block string valueAsString;
1883 dispatch_sync(controlQueue, ^{
1884 if (stopped || lostContact) {
1885 return;
1886 }
1887
1889
1890 try
1891 {
1892 zmq_msg_t messages[2];
1893 vuoInitMessageWithBool(&messages[0], !isInCurrentProcess());
1894 vuoInitMessageWithString(&messages[1], port->getName().c_str());
1897 valueAsString = receiveString("null");
1898 }
1899 catch (VuoException &e)
1900 {
1901 stopBecauseLostContact(e.what());
1902 }
1903 });
1904 return json_tokener_parse(valueAsString.c_str());
1905}
1906
1918{
1919 __block string valueAsString;
1920 dispatch_sync(controlQueue, ^{
1921 if (stopped || lostContact) {
1922 return;
1923 }
1924
1926
1927 try
1928 {
1929 zmq_msg_t messages[2];
1930 vuoInitMessageWithBool(&messages[0], !isInCurrentProcess());
1931 vuoInitMessageWithString(&messages[1], port->getName().c_str());
1934 valueAsString = receiveString("null");
1935 }
1936 catch (VuoException &e)
1937 {
1938 stopBecauseLostContact(e.what());
1939 }
1940 });
1941
1942 // https://b33p.net/kosada/node/17535
1943 json_object *js = json_tokener_parse(valueAsString.c_str());
1944 if (VuoRunner_isHostVDMX && port->getName() == "outputImage")
1945 {
1946 json_object *o;
1947 uint64_t actualWidth = 0;
1948 if (json_object_object_get_ex(js, "pixelsWide", &o))
1949 actualWidth = json_object_get_int64(o);
1950 uint64_t actualHeight = 0;
1951 if (json_object_object_get_ex(js, "pixelsHigh", &o))
1952 actualHeight = json_object_get_int64(o);
1953
1954 if (p->lastWidth && p->lastHeight
1955 && (actualWidth != p->lastWidth || actualHeight != p->lastHeight))
1956 {
1957 call_once(p->vuoImageFunctionsInitialized, [=](){
1958 p->vuoImageMakeFromJsonWithDimensions = (Private::vuoImageMakeFromJsonWithDimensionsType)dlsym(RTLD_SELF, "VuoImage_makeFromJsonWithDimensions");
1959 if (!p->vuoImageMakeFromJsonWithDimensions)
1960 {
1961 VUserLog("Error: Couldn't find VuoImage_makeFromJsonWithDimensions.");
1962 return;
1963 }
1964
1965 p->vuoImageGetInterprocessJson = (Private::vuoImageGetInterprocessJsonType)dlsym(RTLD_SELF, "VuoImage_getInterprocessJson");
1967 {
1968 VUserLog("Error: Couldn't find VuoImage_getInterprocessJson.");
1969 return;
1970 }
1971 });
1972
1974 {
1976 return p->vuoImageGetInterprocessJson(vi);
1977 }
1978 }
1979 }
1980
1981 return js;
1982}
1983
1998vector<VuoRunner::Port *> VuoRunner::getCachedPublishedPorts(bool input)
1999{
2000 // Caching not only provides faster access (without zmq messages),
2001 // but also ensures that the VuoRunner::Port pointers passed to
2002 // VuoRunnerDelegate::receivedTelemetryPublishedOutputPortUpdated are consistent.
2003
2004 if (input)
2005 {
2006 if (! arePublishedInputPortsCached)
2007 {
2008 publishedInputPorts = refreshPublishedPorts(true);
2009 arePublishedInputPortsCached = true;
2010 }
2011 return publishedInputPorts;
2012 }
2013 else
2014 {
2015 if (! arePublishedOutputPortsCached)
2016 {
2017 publishedOutputPorts = refreshPublishedPorts(false);
2018 arePublishedOutputPortsCached = true;
2019 }
2020 return publishedOutputPorts;
2021 }
2022}
2023
2035vector<VuoRunner::Port *> VuoRunner::refreshPublishedPorts(bool input)
2036{
2037 dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
2038 dispatch_source_t timeout = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, queue);
2039 dispatch_source_set_timer(timeout, dispatch_time(DISPATCH_TIME_NOW, 5*NSEC_PER_SEC), NSEC_PER_SEC, NSEC_PER_SEC/10);
2040 dispatch_source_set_event_handler(timeout, ^{
2041 stopBecauseLostContact("The connection between the composition and runner timed out when trying to receive the list of published ports");
2042 dispatch_source_cancel(timeout);
2043 });
2044 dispatch_resume(timeout);
2045
2046 vector<VuoRunner::Port *> ports;
2047
2048 try
2049 {
2051
2052 enum VuoControlRequest requests[4];
2053 enum VuoControlReply replies[4];
2054 if (input)
2055 {
2062 }
2063 else
2064 {
2071 }
2072
2073 vector<string> names;
2074 vector<string> types;
2075 vector<string> details;
2076
2077 for (int i = 0; i < 3; ++i)
2078 {
2079 vuoControlRequestSend(requests[i], NULL, 0);
2080 vuoControlReplyReceive(replies[i]);
2081 vector<string> messageStrings = receiveListOfStrings();
2082 if (i == 0)
2083 names = messageStrings;
2084 else if (i == 1)
2085 types = messageStrings;
2086 else
2087 details = messageStrings;
2088 }
2089
2090 for (size_t i = 0; i < names.size() && i < types.size() && i < details.size(); ++i)
2091 {
2092 VuoRunner::Port *port = new Port(names[i], types[i], json_tokener_parse(details[i].c_str()));
2093 ports.push_back(port);
2094 }
2095 }
2096 catch (...)
2097 {
2098 dispatch_source_cancel(timeout);
2099 dispatch_release(timeout);
2100 throw;
2101 }
2102
2103 dispatch_source_cancel(timeout);
2104 dispatch_release(timeout);
2105
2106 return ports;
2107}
2108
2118vector<VuoRunner::Port *> VuoRunner::getPublishedInputPorts(void)
2119{
2120 return getCachedPublishedPorts(true);
2121}
2122
2132vector<VuoRunner::Port *> VuoRunner::getPublishedOutputPorts(void)
2133{
2134 return getCachedPublishedPorts(false);
2135}
2136
2147{
2148 vector<VuoRunner::Port *> inputPorts = getPublishedInputPorts();
2149 for (vector<VuoRunner::Port *>::iterator i = inputPorts.begin(); i != inputPorts.end(); ++i)
2150 if ((*i)->getName() == name)
2151 return *i;
2152
2153 return NULL;
2154}
2155
2166{
2167 vector<VuoRunner::Port *> outputPorts = getPublishedOutputPorts();
2168 for (vector<VuoRunner::Port *>::iterator i = outputPorts.begin(); i != outputPorts.end(); ++i)
2169 if ((*i)->getName() == name)
2170 return *i;
2171
2172 return NULL;
2173}
2174
2186void VuoRunner::listen()
2187{
2188 // Name this thread.
2189 {
2190 const char *compositionName = dylibPath.empty() ? executablePath.c_str() : dylibPath.c_str();
2191
2192 // Trim the path, if present.
2193 if (const char *lastSlash = strrchr(compositionName, '/'))
2194 compositionName = lastSlash + 1;
2195
2196 char threadName[MAXTHREADNAMESIZE];
2197 snprintf(threadName, MAXTHREADNAMESIZE, "org.vuo.runner.telemetry: %s", compositionName);
2198 pthread_setname_np(threadName);
2199 }
2200
2201 ZMQSelfReceive = zmq_socket(ZMQContext, ZMQ_PAIR);
2202 VuoRunner_configureSocket(ZMQSelfReceive);
2203 if (zmq_bind(ZMQSelfReceive, "inproc://vuo-runner-self") != 0)
2204 {
2205 listenError = strerror(errno);
2206 dispatch_semaphore_signal(beganListeningSemaphore);
2207 return;
2208 }
2209
2210 ZMQSelfSend = zmq_socket(ZMQContext, ZMQ_PAIR);
2211 VuoRunner_configureSocket(ZMQSelfSend);
2212 if (zmq_connect(ZMQSelfSend, "inproc://vuo-runner-self") != 0)
2213 {
2214 listenError = strerror(errno);
2215 dispatch_semaphore_signal(beganListeningSemaphore);
2216 return;
2217 }
2218
2219 {
2220 ZMQTelemetry = zmq_socket(ZMQContext,ZMQ_SUB);
2221 VuoRunner_configureSocket(ZMQTelemetry);
2222 if(zmq_connect(ZMQTelemetry,ZMQTelemetryURL.c_str()))
2223 {
2224 listenError = strerror(errno);
2225 dispatch_semaphore_signal(beganListeningSemaphore);
2226 return;
2227 }
2228
2229 const int highWaterMark = 0; // no limit
2230 if(zmq_setsockopt(ZMQTelemetry,ZMQ_RCVHWM,&highWaterMark,sizeof(highWaterMark)))
2231 {
2232 listenError = strerror(errno);
2233 dispatch_semaphore_signal(beganListeningSemaphore);
2234 return;
2235 }
2236 }
2237
2238 {
2239 // subscribe to all types of telemetry
2240 char type = VuoTelemetryHeartbeat;
2241 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2243 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2245 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2247 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2249 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2251 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2253 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2255 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2256 type = VuoTelemetryError;
2257 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2259 zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2260 }
2261
2262 {
2263 // Wait until the connection is established, as evidenced by a heartbeat telemetry message
2264 // being received from the composition. This is necessary because the ØMQ API doesn't provide
2265 // any way to tell when a SUB socket is ready to receive messages, and if you call zmq_poll()
2266 // on it before it's ready, then it might miss messages that came in while it was still trying
2267 // to get ready. (The zmq_connect() function doesn't make any guarantees about the socket being ready.
2268 // It just starts some setup that may continue asynchronously after zmq_connect() has returned.)
2269 // To avoid missing important telemetry messages from the composition, we make sure that the
2270 // runner doesn't tell the composition to unpause until the runner has verified that it's
2271 // receiving heartbeat telemetry messages. http://zguide.zeromq.org/page:all#Node-Coordination
2272 zmq_pollitem_t items[]=
2273 {
2274 {ZMQTelemetry,0,ZMQ_POLLIN,0},
2275 };
2276 int itemCount = 1;
2277 long timeout = -1;
2278 zmq_poll(items,itemCount,timeout);
2279 }
2280
2281 dispatch_semaphore_signal(beganListeningSemaphore);
2282
2283 bool pendingCancel = false;
2284 while(! listenCanceled)
2285 {
2286 zmq_pollitem_t items[]=
2287 {
2288 {ZMQTelemetry,0,ZMQ_POLLIN,0},
2289 {ZMQSelfReceive,0,ZMQ_POLLIN,0},
2290 };
2291 int itemCount = 2;
2292
2293 // Wait 1 second. If no telemetry was received in that second, we probably lost contact with the composition.
2294 long timeout = pendingCancel ? 100 : 1000;
2295 zmq_poll(items,itemCount,timeout);
2296 if(items[0].revents & ZMQ_POLLIN)
2297 {
2298 // Receive telemetry type.
2299 char type = vuoReceiveInt(ZMQTelemetry, NULL);
2300
2301 // Receive telemetry arguments and forward to VuoRunnerDelegate.
2302 switch (type)
2303 {
2305 {
2306 dispatch_sync(delegateQueue, ^{
2307 if (delegate)
2308 delegate->receivedTelemetryStats(0, 0);
2309 });
2310 break;
2311 }
2313 {
2314 char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2315 char *nodeIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2316 dispatch_sync(delegateQueue, ^{
2317 if (delegate)
2318 delegate->receivedTelemetryNodeExecutionStarted(compositionIdentifier, nodeIdentifier);
2319 });
2320 free(compositionIdentifier);
2321 free(nodeIdentifier);
2322 break;
2323 }
2325 {
2326 char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2327 char *nodeIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2328 dispatch_sync(delegateQueue, ^{
2329 if (delegate)
2330 delegate->receivedTelemetryNodeExecutionFinished(compositionIdentifier, nodeIdentifier);
2331 });
2332 free(compositionIdentifier);
2333 free(nodeIdentifier);
2334 break;
2335 }
2337 {
2338 while (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2339 {
2340 char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2341 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2342 {
2343 char *portIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2344 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2345 {
2346 bool receivedEvent = vuoReceiveBool(ZMQTelemetry, NULL);
2347 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2348 {
2349 bool receivedData = vuoReceiveBool(ZMQTelemetry, NULL);
2350 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2351 {
2352 string portDataSummary;
2353 char *s = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2354 if (s)
2355 {
2356 portDataSummary = s;
2357 free(s);
2358 }
2359 else
2360 portDataSummary = "";
2361
2362 dispatch_sync(delegateQueue, ^{
2363 if (delegate)
2364 delegate->receivedTelemetryInputPortUpdated(compositionIdentifier, portIdentifier, receivedEvent, receivedData, portDataSummary);
2365 });
2366 }
2367 }
2368 }
2369 free(portIdentifier);
2370 }
2371 free(compositionIdentifier);
2372 }
2373 break;
2374 }
2376 {
2377 while (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2378 {
2379 char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2380 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2381 {
2382 char *portIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2383 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2384 {
2385 bool sentEvent = vuoReceiveBool(ZMQTelemetry, NULL);
2386 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2387 {
2388 bool sentData = vuoReceiveBool(ZMQTelemetry, NULL);
2389 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2390 {
2391 string portDataSummary;
2392 char *s = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2393 if (s)
2394 {
2395 portDataSummary = s;
2396 free(s);
2397 }
2398 else
2399 portDataSummary = "";
2400
2401 dispatch_sync(delegateQueue, ^{
2402 if (delegate)
2403 delegate->receivedTelemetryOutputPortUpdated(compositionIdentifier, portIdentifier, sentEvent, sentData, portDataSummary);
2404 });
2405 }
2406 }
2407 }
2408 free(portIdentifier);
2409 }
2410 free(compositionIdentifier);
2411 }
2412 break;
2413 }
2415 {
2416 while (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2417 {
2418 char *portIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2419 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2420 {
2421 bool sentData = vuoReceiveBool(ZMQTelemetry, NULL);
2422 if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2423 {
2424 string portDataSummary;
2425 char *s = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2426 if (s)
2427 {
2428 portDataSummary = s;
2429 free(s);
2430 }
2431 else
2432 portDataSummary = "";
2433
2434 Port *port = getPublishedOutputPortWithName(portIdentifier);
2435
2436 dispatch_sync(delegateQueue, ^{
2437 if (delegate)
2438 delegate->receivedTelemetryPublishedOutputPortUpdated(port, sentData, portDataSummary);
2439 });
2440 }
2441 }
2442 free(portIdentifier);
2443 }
2444 break;
2445 }
2447 {
2448 saturating_semaphore_signal(lastFiredEventSemaphore, &lastFiredEventSignaled);
2449 break;
2450 }
2452 {
2453 char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2454 char *portIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2455 dispatch_sync(delegateQueue, ^{
2456 if (delegate)
2457 delegate->receivedTelemetryEventDropped(compositionIdentifier, portIdentifier);
2458 });
2459 free(compositionIdentifier);
2460 free(portIdentifier);
2461 break;
2462 }
2463 case VuoTelemetryError:
2464 {
2465 char *message = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2466 dispatch_sync(delegateQueue, ^{
2467 if (delegate)
2468 delegate->receivedTelemetryError( string(message) );
2469 });
2470 free(message);
2471 break;
2472 }
2474 {
2475 dispatch_sync(delegateQueue, ^{
2476 if (delegate)
2477 delegate->lostContactWithComposition();
2478 });
2479 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2480 stop();
2481 });
2482 break;
2483 }
2484 default:
2485 VUserLog("Error: Unknown telemetry message type: %d", type);
2486 break;
2487 }
2488 }
2489 else if (! listenCanceled) // Either the 1-second timeout elapsed, or we got a stop-listening message from ZMQSelfSend
2490 {
2491 if (items[1].revents & ZMQ_POLLIN)
2492 {
2493 // This is a stop-listening message.
2494 vuoReceiveInt(ZMQSelfReceive, NULL);
2495
2496 // Drain any remaining telemetry messages.
2497 pendingCancel = true;
2498 }
2499
2500 else if (pendingCancel)
2501 listenCanceled = true;
2502
2503 else
2504 {
2505 // Timeout.
2506 // Could happen if the composition crashed, or if the system fell asleep then hibernated (standby mode).
2507 // If it's a crash we should disconnect; if it's hibernation we should ignore the timeout and try zmq_poll again.
2509 VDebugLog("zmq_poll timed out, but system is sleeping so I'll try again.");
2510 else if (VuoLog_isDebuggerAttached())
2511 VDebugLog("zmq_poll timed out, but a debugger is attached to the host so I'll try again.");
2512 else
2513 {
2514 listenCanceled = true;
2515 string dir, file, ext;
2516 VuoFileUtilities::splitPath(executablePath, dir, file, ext);
2517 stopBecauseLostContact("The connection between the composition ('" + file + "') and runner timed out while listening for telemetry.");
2518 }
2519 }
2520 }
2521 }
2522
2523 zmq_close(ZMQTelemetry);
2524 ZMQTelemetry = NULL;
2525
2526 zmq_close(ZMQSelfSend);
2527 ZMQSelfSend = NULL;
2528 zmq_close(ZMQSelfReceive);
2529 ZMQSelfReceive = NULL;
2530
2531 dispatch_semaphore_signal(endedListeningSemaphore);
2532 return;
2533}
2534
2544void VuoRunner::vuoControlRequestSend(enum VuoControlRequest request, zmq_msg_t *messages, unsigned int messageCount)
2545{
2546 char *error = NULL;
2547 vuoSend("runner VuoControl",ZMQControl,request,messages,messageCount,false,&error);
2548
2549 if (error)
2550 {
2551 string e(error);
2552 free(error);
2553 throw VuoException(e);
2554 }
2555}
2556
2566void VuoRunner::vuoLoaderControlRequestSend(enum VuoLoaderControlRequest request, zmq_msg_t *messages, unsigned int messageCount)
2567{
2568 char *error = NULL;
2569 vuoSend("runner VuoLoaderControl",ZMQLoaderControl,request,messages,messageCount,false,&error);
2570
2571 if (error)
2572 {
2573 string e(error);
2574 free(error);
2575 throw VuoException(e);
2576 }
2577}
2578
2587void VuoRunner::vuoControlReplyReceive(enum VuoControlReply expectedReply)
2588{
2589 char *error = NULL;
2590 int reply = vuoReceiveInt(ZMQControl, &error);
2591
2592 if (error)
2593 {
2594 string e(error);
2595 free(error);
2596 ostringstream oss;
2597 oss << e << " (expected " << expectedReply << ")";
2598 throw VuoException(oss.str());
2599 }
2600 else if (reply != expectedReply)
2601 {
2602 ostringstream oss;
2603 oss << "The runner received the wrong message from the composition (expected " << expectedReply << ", received " << reply << ")";
2604 throw VuoException(oss.str());
2605 }
2606}
2607
2616void VuoRunner::vuoLoaderControlReplyReceive(enum VuoLoaderControlReply expectedReply)
2617{
2618 char *error = NULL;
2619 int reply = vuoReceiveInt(ZMQLoaderControl, &error);
2620
2621 if (error)
2622 {
2623 string e(error);
2624 free(error);
2625 ostringstream oss;
2626 oss << e << " (expected " << expectedReply << ")";
2627 throw VuoException(oss.str());
2628 }
2629 else if (reply != expectedReply)
2630 {
2631 ostringstream oss;
2632 oss << "The runner received the wrong message from the composition loader (expected " << expectedReply << ", received " << reply << ")";
2633 throw VuoException(oss.str());
2634 }
2635}
2636
2642string VuoRunner::receiveString(string fallbackIfNull)
2643{
2644 char *error = NULL;
2645 char *s = vuoReceiveAndCopyString(ZMQControl, &error);
2646
2647 if (error)
2648 {
2649 string e(error);
2650 free(error);
2651 throw VuoException(e);
2652 }
2653
2654 string ret;
2655 if (s)
2656 {
2657 ret = s;
2658 free(s);
2659 }
2660 else
2661 ret = fallbackIfNull;
2662
2663 return ret;
2664}
2665
2669vector<string> VuoRunner::receiveListOfStrings(void)
2670{
2671 vector<string> messageStrings;
2673 {
2674 string s = receiveString("");
2675 messageStrings.push_back(s);
2676 }
2677 return messageStrings;
2678}
2679
2685void VuoRunner::saturating_semaphore_signal(dispatch_semaphore_t dsema, bool *signaled)
2686{
2687 if (__sync_bool_compare_and_swap(signaled, false, true))
2688 dispatch_semaphore_signal(dsema);
2689}
2690
2696void VuoRunner::saturating_semaphore_wait(dispatch_semaphore_t dsema, bool *signaled)
2697{
2698 *signaled = false;
2699 dispatch_semaphore_wait(dsema, DISPATCH_TIME_FOREVER);
2700}
2701
2706{
2707 return stopped;
2708}
2709
2713bool VuoRunner::isInCurrentProcess(void)
2714{
2715 return executablePath.empty();
2716}
2717
2722bool VuoRunner::isUsingCompositionLoader(void)
2723{
2724 return ! executablePath.empty() && ! dylibPath.empty();
2725}
2726
2731{
2732 dispatch_sync(delegateQueue, ^{
2733 this->delegate = delegate;
2734 });
2735}
2736
2740void VuoRunner::stopBecauseLostContact(string errorMessage)
2741{
2742 __block bool alreadyLostContact;
2743 dispatch_sync(delegateQueue, ^{
2744 alreadyLostContact = lostContact;
2745 lostContact = true;
2746 });
2747
2748 if (alreadyLostContact)
2749 return;
2750
2751 saturating_semaphore_signal(lastFiredEventSemaphore, &lastFiredEventSignaled);
2752
2753 dispatch_sync(delegateQueue, ^{
2754 if (delegate)
2755 delegate->lostContactWithComposition();
2756 });
2757
2758 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2759 stop();
2760 });
2761
2762 if (! isInCurrentProcess())
2763 {
2764 // Normally, stop() is responsible for terminating the ZMQ context.
2765 // But, if stopBecauseLostContact() is called, it takes the responsibility away from stop().
2766 // If there's an in-progress zmq_recv() call, stop() will get stuck waiting on controlQueue, so
2767 // the below call to terminate the ZMQ context interrupts zmq_recv() and allows stop() to proceed.
2768 dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2770
2771 zmq_term(ZMQContext);
2772 ZMQContext = NULL;
2773 dispatch_semaphore_signal(terminatedZMQContextSemaphore);
2774 });
2775 }
2776
2777 VUserLog("%s", errorMessage.c_str());
2778}
2779
2786{
2787 return compositionPid;
2788}
2789
2797VuoRunner::Port::Port(string name, string type, json_object *details)
2798{
2799 this->name = name;
2800 this->type = type;
2801 this->details = details;
2802}
2803
2808{
2809 return name;
2810}
2811
2816{
2817 return type;
2818}
2819
2848{
2849 return details;
2850}
2851
2852VuoRunnerDelegate::~VuoRunnerDelegate() { } // Fixes "undefined symbols" error