Vuo  2.3.1
VuoRunner.cc
Go to the documentation of this file.
1 
10 #include "VuoRunner.hh"
11 #include "VuoFileUtilities.hh"
12 #include "VuoStringUtilities.hh"
13 #include "VuoEventLoop.h"
14 #include "VuoException.hh"
15 #include "VuoRuntime.h"
17 
18 #include <CoreServices/CoreServices.h>
19 #include <pthread.h>
20 #include <stdio.h>
21 #include <dlfcn.h>
22 #include <sstream>
23 #include <copyfile.h>
24 #include <mach-o/loader.h>
25 #include <sys/proc_info.h>
26 #include <sys/stat.h>
27 
28 void *VuoApp_mainThread = NULL;
29 static const char *mainThreadChecker = "/Applications/Xcode.app/Contents/Developer/usr/lib/libMainThreadChecker.dylib";
31 static bool VuoRunner_isHostVDMX = false;
32 
37 static void VuoRunner_closeOnExec(int fd)
38 {
39  int flags = fcntl(fd, F_GETFD);
40  if (flags < 0)
41  {
42  VUserLog("Error: Couldn't get flags for desciptor %d: %s", fd, strerror(errno));
43  return;
44  }
45 
46  flags |= FD_CLOEXEC;
47 
48  if (fcntl(fd, F_SETFD, flags) != 0)
49  VUserLog("Error: Couldn't set FD_CLOEXEC on descriptor %d: %s", fd, strerror(errno));
50 }
51 
55 static void __attribute__((constructor)) VuoRunner_init()
56 {
57  VuoApp_mainThread = (void *)pthread_self();
58 
59 #pragma clang diagnostic push
60 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
61  // Calls _TSGetMainThread().
62  // https://b33p.net/kosada/node/12944
63  YieldToAnyThread();
64 #pragma clang diagnostic pop
65 
67 
68  // Ensure that the write end of this pipe gets closed upon fork()/exec(),
69  // so child processes don't prop open this pipe,
70  // which would prevent Vuo compositions from quitting when the VuoRunner process quits.
72 
73  if (VuoStringUtilities::makeFromCFString(CFBundleGetIdentifier(CFBundleGetMainBundle())) == "com.vidvox.VDMX5")
74  VuoRunner_isHostVDMX = true;
75 
77 }
78 
82 static bool isMainThread(void)
83 {
84  return VuoApp_mainThread == (void *)pthread_self();
85 }
86 
90 static void VuoRunner_configureSocket(void *zmqSocket, int timeoutInSeconds)
91 {
92  if (timeoutInSeconds >= 0)
93  {
94  int timeoutInMilliseconds = timeoutInSeconds * 1000;
95  zmq_setsockopt(zmqSocket, ZMQ_RCVTIMEO, &timeoutInMilliseconds, sizeof timeoutInMilliseconds);
96  zmq_setsockopt(zmqSocket, ZMQ_SNDTIMEO, &timeoutInMilliseconds, sizeof timeoutInMilliseconds);
97  }
98 
99  int linger = 0; // avoid having zmq_term block if the runner has tried to send a message on a broken connection
100  zmq_setsockopt(zmqSocket, ZMQ_LINGER, &linger, sizeof linger);
101 }
102 
107 {
108 public:
109  Private() :
110  lastWidth(0),
111  lastHeight(0)
112  {
113  }
114 
116  typedef void *(*vuoImageMakeFromJsonWithDimensionsType)(json_object *, unsigned int, unsigned int);
118  typedef json_object *(*vuoImageGetJsonType)(void *);
120 
121  uint64_t lastWidth;
122  uint64_t lastHeight;
123 };
124 
135 VuoRunner * VuoRunner::newSeparateProcessRunnerFromExecutable(string executablePath, string sourceDir,
136  bool continueIfRunnerDies, bool deleteExecutableWhenFinished)
137 {
138  VuoRunner * vr = new VuoRunner();
139  vr->executablePath = executablePath;
140  vr->shouldContinueIfRunnerDies = continueIfRunnerDies;
141  vr->shouldDeleteBinariesWhenFinished = deleteExecutableWhenFinished;
142  vr->sourceDir = sourceDir;
143  return vr;
144 }
145 
161 VuoRunner * VuoRunner::newSeparateProcessRunnerFromDynamicLibrary(string compositionLoaderPath, string compositionDylibPath,
162  const std::shared_ptr<VuoRunningCompositionLibraries> &runningCompositionLibraries,
163  string sourceDir, bool continueIfRunnerDies, bool deleteDylibsWhenFinished)
164 {
165  VuoRunner * vr = new VuoRunner();
166  vr->executablePath = compositionLoaderPath;
167  vr->dylibPath = compositionDylibPath;
168  vr->dependencyLibraries = runningCompositionLibraries;
169  vr->sourceDir = sourceDir;
170  vr->shouldContinueIfRunnerDies = continueIfRunnerDies;
171  vr->shouldDeleteBinariesWhenFinished = deleteDylibsWhenFinished;
172  runningCompositionLibraries->setDeleteResourceLibraries(deleteDylibsWhenFinished);
173  return vr;
174 }
175 
186  bool deleteDylibWhenFinished)
187 {
188  VuoRunner * vr = new VuoRunner();
189  vr->dylibPath = dylibPath;
190  vr->shouldDeleteBinariesWhenFinished = deleteDylibWhenFinished;
191  vr->sourceDir = sourceDir;
192  return vr;
193 }
194 
201 {
202  dispatch_release(stoppedSemaphore);
203  dispatch_release(terminatedZMQContextSemaphore);
204  dispatch_release(beganListeningSemaphore);
205  dispatch_release(endedListeningSemaphore);
206  dispatch_release(lastFiredEventSemaphore);
207  dispatch_release(delegateQueue);
208  delete p;
209 }
210 
215 void VuoRunner::setRuntimeChecking(bool runtimeCheckingEnabled)
216 {
217  if (!stopped)
218  {
219  VUserLog("Error: Only call VuoRunner::setRuntimeChecking() prior to starting the composition.");
220  return;
221  }
222 
223  isRuntimeCheckingEnabled = runtimeCheckingEnabled && VuoFileUtilities::fileExists(mainThreadChecker);
224 }
225 
229 VuoRunner::VuoRunner(void)
230 {
231  p = new Private;
232  dylibHandle = NULL;
233  dependencyLibraries = NULL;
234  shouldContinueIfRunnerDies = false;
235  shouldDeleteBinariesWhenFinished = false;
236  isRuntimeCheckingEnabled = false;
237  paused = true;
238  stopped = true;
239  lostContact = false;
240  listenCanceled = false;
241  stoppedSemaphore = dispatch_semaphore_create(1);
242  terminatedZMQContextSemaphore = dispatch_semaphore_create(0);
243  beganListeningSemaphore = dispatch_semaphore_create(0);
244  endedListeningSemaphore = dispatch_semaphore_create(1);
245  lastFiredEventSemaphore = dispatch_semaphore_create(0);
246  lastFiredEventSignaled = false;
247  controlQueue = dispatch_queue_create("org.vuo.runner.control", NULL);
248  ZMQContext = NULL;
249  ZMQSelfSend = NULL;
250  ZMQSelfReceive = NULL;
251  ZMQControl = NULL;
252  ZMQTelemetry = NULL;
253  ZMQLoaderControl = NULL;
254  delegate = NULL;
255  delegateQueue = dispatch_queue_create("org.vuo.runner.delegate", NULL);
256  arePublishedInputPortsCached = false;
257  arePublishedOutputPortsCached = false;
258 }
259 
276 {
277  try
278  {
279  startInternal();
280 
281  if (isInCurrentProcess())
282  {
283  dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
284  dispatch_async(queue, ^{
285  unpause();
286  });
287  while (paused)
288  {
290  usleep(USEC_PER_SEC / 1000);
291  }
292  }
293  else
294  {
295  unpause();
296  }
297  }
298  catch (VuoException &e)
299  {
300  stopBecauseLostContact(e.what());
301  }
302 }
303 
323 {
324  try
325  {
326  startInternal();
327  }
328  catch (VuoException &e)
329  {
330  stopBecauseLostContact(e.what());
331  }
332 }
333 
338 void VuoRunner::copyDylibAndChangeId(string dylibPath, string &outputDylibPath)
339 {
340  string directory, file, extension;
341  VuoFileUtilities::splitPath(dylibPath, directory, file, extension);
342 
343  const int makeTmpFileExtension = 7;
344  if (file.length() > makeTmpFileExtension)
345  {
346  // makeTmpFile() appends "-XXXXXX"; make room for that.
347  string trimmedFile = file.substr(0, file.length() - makeTmpFileExtension);
348 
349  bool alreadyLoaded;
350  do
351  {
352  outputDylibPath = VuoFileUtilities::makeTmpFile(trimmedFile, "dylib");
353  alreadyLoaded = dlopen(outputDylibPath.c_str(), RTLD_NOLOAD);
354  } while (alreadyLoaded);
355  }
356  else
357  {
358  // For short names, like those generated by VDMX, just replace the entire name with a hash.
359  // https://b33p.net/kosada/node/12917
360  bool alreadyLoaded;
361  do
362  {
363  string hash = VuoStringUtilities::makeRandomHash(file.length());
364  outputDylibPath = "/tmp/" + hash + ".dylib";
365  alreadyLoaded = dlopen(outputDylibPath.c_str(), RTLD_NOLOAD);
366  } while (alreadyLoaded);
367  }
368 
369  string newDirectory, newFile, newExtension;
370  VuoFileUtilities::splitPath(outputDylibPath, newDirectory, newFile, newExtension);
371 
372  if (newFile.length() > file.length())
373  throw VuoException("The composition couldn't start because the uniqued dylib name (" + newFile + ") is longer than the original dylib name (" + file + ").");
374 
375  if (copyfile(dylibPath.c_str(), outputDylibPath.c_str(), NULL, COPYFILE_ALL))
376  throw VuoException("The composition couldn't start because a copy of the dylib couldn't be made.");
377 
378  FILE *fp = fopen(outputDylibPath.c_str(), "r+b");
379  if (!fp)
380  throw VuoException("The composition couldn't start because the dylib's header couldn't be opened.");
381  VuoDefer(^{ fclose(fp); });
382 
383  struct mach_header_64 header;
384  if (fread(&header, sizeof(header), 1, fp) != 1)
385  throw VuoException("The composition couldn't start because the dylib's header couldn't be read.");
386 
387  if (header.magic != MH_MAGIC_64
388  || header.cputype != CPU_TYPE_X86_64)
389  throw VuoException("The composition couldn't start because the dylib isn't an x86_64-only (non-fat) Mach-O binary.");
390 
391  for (int i = 0; i < header.ncmds; ++i)
392  {
393  struct load_command lc;
394  if (fread(&lc, sizeof(lc), 1, fp) != 1)
395  throw VuoException("The composition couldn't start because the dylib's command couldn't be read.");
396 
397  // VLog("cmd[%d]: %x (size %d)",i,lc.cmd,lc.cmdsize);
398  if (lc.cmd == LC_ID_DYLIB)
399  {
400  fseek(fp, sizeof(struct dylib), SEEK_CUR);
401 
402  size_t nameLength = lc.cmdsize - sizeof(struct dylib_command);
403  char *name = (char *)calloc(nameLength + 1, 1);
404  if (fread(name, nameLength, 1, fp) != 1)
405  throw VuoException("The composition couldn't start because the dylib's ID command couldn't be read.");
406 
407 // VLog("Changing name \"%s\" to \"%s\"…", name, outputDylibPath.c_str());
408  fseek(fp, -nameLength, SEEK_CUR);
409  bzero(name, nameLength);
410  memcpy(name, outputDylibPath.c_str(), min(nameLength, outputDylibPath.length()));
411  fwrite(name, nameLength, 1, fp);
412  return;
413  }
414  else
415  fseek(fp, lc.cmdsize-sizeof(lc), SEEK_CUR);
416  }
417 
418  throw VuoException("The composition couldn't start because the dylib's LC_ID_DYLIB command couldn't be found.");
419 }
420 
430 void VuoRunner::startInternal(void)
431 {
432  stopped = false;
433  dispatch_semaphore_wait(stoppedSemaphore, DISPATCH_TIME_FOREVER);
434 
435  ZMQContext = zmq_init(1);
436 
437  if (isInCurrentProcess())
438  {
439  // Start the composition in the current process.
440 
441  bool alreadyLoaded = dlopen(dylibPath.c_str(), RTLD_NOLOAD);
442  if (alreadyLoaded)
443  {
444  // Each composition instance needs its own global variables.
445  // Change the dylib's internal name, to convince dlopen() to load another instance of it.
446 
447  string uniquedDylibPath;
448  copyDylibAndChangeId(dylibPath, uniquedDylibPath);
449  VDebugLog("\"%s\" is already loaded, so I duplicated it and changed its LC_ID_DYLIB to \"%s\".", dylibPath.c_str(), uniquedDylibPath.c_str());
450 
451  if (shouldDeleteBinariesWhenFinished)
452  remove(dylibPath.c_str());
453 
454  dylibPath = uniquedDylibPath;
455  shouldDeleteBinariesWhenFinished = true;
456  }
457 
458  dylibHandle = dlopen(dylibPath.c_str(), RTLD_NOW);
459  if (!dylibHandle)
460  throw VuoException("The composition couldn't start because the library '" + dylibPath + "' couldn't be loaded : " + dlerror());
461 
462  try
463  {
465  if (! vuoInitInProcess)
466  throw VuoException("The composition couldn't start because vuoInitInProcess() couldn't be found in '" + dylibPath + "' : " + dlerror());
467 
468  ZMQControlURL = "inproc://" + VuoFileUtilities::makeTmpFile("vuo-control", "");
469  ZMQTelemetryURL = "inproc://" + VuoFileUtilities::makeTmpFile("vuo-telemetry", "");
470 
471  vuoInitInProcess(ZMQContext, ZMQControlURL.c_str(), ZMQTelemetryURL.c_str(), true, getpid(), -1, false,
472  sourceDir.c_str(), dylibHandle, NULL, false);
473  }
474  catch (VuoException &e)
475  {
476  VUserLog("error: %s", e.what());
477  dlclose(dylibHandle);
478  dylibHandle = NULL;
479  throw;
480  }
481  }
482  else
483  {
484  // Start the composition or composition loader in a new process.
485 
486  vector<string> args;
487 
488  string executableName;
489  if (isUsingCompositionLoader())
490  {
491  // If we're using the loader, set the executable's display name to the dylib,
492  // so that composition's name shows up in the process list.
493  string dir, file, ext;
494  VuoFileUtilities::splitPath(dylibPath, dir, file, ext);
495  executableName = file;
496  }
497  else
498  {
499  string dir, file, ext;
500  VuoFileUtilities::splitPath(executablePath, dir, file, ext);
501  string executableName = file;
502  if (! ext.empty())
503  executableName += "." + ext;
504  }
505  args.push_back(executableName);
506 
507  // https://b33p.net/kosada/node/16374
508  // The socket's full pathname (`sockaddr_un::sun_path`) must be 104 characters or less
509  // (https://opensource.apple.com/source/xnu/xnu-2782.1.97/bsd/sys/un.h.auto.html).
510  // "/Users/me/Library/Containers/com.apple.ScreenSaver.Engine.legacyScreenSaver/Data/vuo-telemetry-rr8Br3"
511  // is 101 characters, which limits the username to 5 characters.
512  // "/Users/me/Library/Containers/com.apple.ScreenSaver.Engine.legacyScreenSaver/Data/v-rr8Br3"
513  // is 89 characters, which limits the username to 17 characters
514  // (still not a lot, but more likely to work with typical macOS usernames).
515  ZMQControlURL = "ipc://" + VuoFileUtilities::makeTmpFile("v", "");
516  ZMQTelemetryURL = "ipc://" + VuoFileUtilities::makeTmpFile("v", "");
517  args.push_back("--vuo-control=" + ZMQControlURL);
518  args.push_back("--vuo-telemetry=" + ZMQTelemetryURL);
519 
520  {
521  ostringstream oss;
522  oss << getpid();
523  args.push_back("--vuo-runner-pid=" + oss.str());
524  }
525 
526  {
527  ostringstream oss;
529  args.push_back("--vuo-runner-pipe=" + oss.str());
530  }
531 
532  if (shouldContinueIfRunnerDies)
533  args.push_back("--vuo-continue-if-runner-dies");
534 
535  if (isUsingCompositionLoader())
536  {
537  ZMQLoaderControlURL = "ipc://" + VuoFileUtilities::makeTmpFile("v", "");
538  args.push_back("--vuo-loader=" + ZMQLoaderControlURL);
539  }
540  else
541  args.push_back("--vuo-pause");
542 
543  int fd[2];
544  int ret = pipe(fd);
545  if (ret)
546  throw VuoException("The composition couldn't start because a pipe couldn't be opened : " + string(strerror(errno)));
547 
548  int argSize = args.size();
549  char *argv[argSize + 1];
550  for (size_t i = 0; i < argSize; ++i)
551  {
552  size_t mallocSize = args[i].length() + 1;
553  argv[i] = (char *)malloc(mallocSize);
554  strlcpy(argv[i], args[i].c_str(), mallocSize);
555  }
556  argv[argSize] = NULL;
557 
558  string errorWorkingDirectory = "The composition couldn't start because the working directory couldn't be changed to '" + sourceDir + "' : ";
559  string errorExecutable = "The composition couldn't start because the file '" + executablePath + "' couldn't be executed : ";
560  string errorFork = "The composition couldn't start because the composition process couldn't be forked : ";
561  const size_t ERROR_BUFFER_LEN = 256;
562  char errorBuffer[ERROR_BUFFER_LEN];
563 
564  pipe(runnerReadCompositionWritePipe);
565 
566  pid_t childPid = fork();
567  if (childPid == 0)
568  {
569  // There are only a limited set of functions you're allowed to call in the child process
570  // after fork() and before exec(). Functions such as VUserLog() and exit() aren't allowed,
571  // so instead we're calling alternatives such as write() and _exit().
572 
573  close(runnerReadCompositionWritePipe[0]);
574 
575  pid_t grandchildPid = fork();
576  if (grandchildPid == 0)
577  {
578  close(fd[0]);
579  close(fd[1]);
580 
581  // Set the current working directory to that of the source .vuo composition so that
582  // relative URL paths are resolved correctly.
583  if (!sourceDir.empty())
584  {
585  ret = chdir(sourceDir.c_str());
586  if (ret)
587  {
588  strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
589  write(STDERR_FILENO, errorWorkingDirectory.c_str(), errorWorkingDirectory.length());
590  write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
591  write(STDERR_FILENO, "\n", 1);
592  _exit(-1);
593  }
594  }
595 
596  if (isRuntimeCheckingEnabled)
597  setenv("DYLD_INSERT_LIBRARIES", mainThreadChecker, 1);
598 
599  ret = execv(executablePath.c_str(), argv);
600  if (ret)
601  {
602  strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
603  write(STDERR_FILENO, errorExecutable.c_str(), errorExecutable.length());
604  write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
605  write(STDERR_FILENO, "\n", 1);
606  for (size_t i = 0; i < argSize; ++i)
607  free(argv[i]);
608  _exit(-1);
609  }
610  }
611  else if (grandchildPid > 0)
612  {
613  close(fd[0]);
614 
615  int ret = write(fd[1], &grandchildPid, sizeof(pid_t));
616  if (ret != sizeof(pid_t))
617  {
618  strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
619  write(STDERR_FILENO, errorExecutable.c_str(), errorExecutable.length());
620  write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
621  write(STDERR_FILENO, "\n", 1);
622  }
623  close(fd[1]);
624 
625  _exit(0);
626  }
627  else
628  {
629  close(fd[0]);
630  close(fd[1]);
631 
632  strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
633  write(STDERR_FILENO, errorFork.c_str(), errorFork.length());
634  write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
635  write(STDERR_FILENO, "\n", 1);
636  _exit(-1);
637  }
638  }
639  else if (childPid > 0)
640  {
641  close(fd[1]);
642 
643  // If this process launches compositions in addition to this one,
644  // ensure they don't prop open this pipe,
645  // which would prevent VuoRunner::stop's `read()` from terminating.
646  VuoRunner_closeOnExec(runnerReadCompositionWritePipe[1]);
647 
648  for (size_t i = 0; i < argSize; ++i)
649  free(argv[i]);
650 
651  pid_t grandchildPid = 0;
652  int ret = read(fd[0], &grandchildPid, sizeof(pid_t));
653  if (ret != sizeof(pid_t))
654  throw VuoException("The composition couldn't start because the composition process id couldn't be obtained: " + string(strerror(errno)));
655  close(fd[0]);
656 
657  // Reap the child process.
658  int status;
659  do {
660  ret = waitpid(childPid, &status, 0);
661  } while (ret == -1 && errno == EINTR);
662  if (WIFEXITED(status) && WEXITSTATUS(status))
663  throw VuoException("The composition couldn't start because the parent of the composition process exited with an error.");
664  else if (WIFSIGNALED(status))
665  throw VuoException("The composition couldn't start because the parent of the composition process exited abnormally : " + string(strsignal(WTERMSIG(status))));
666 
667  if (grandchildPid > 0)
668  compositionPid = grandchildPid;
669  else
670  throw VuoException("The composition couldn't start because the composition process id couldn't be obtained");
671  }
672  else
673  {
674  for (size_t i = 0; i < argSize; ++i)
675  free(argv[i]);
676 
677  throw VuoException("The composition couldn't start because the parent of the composition process couldn't be forked : " + string(strerror(errno)));
678  }
679  }
680 
681  // Connect to the composition loader (if any) and composition.
682  if (isUsingCompositionLoader())
683  {
684  ZMQLoaderControl = zmq_socket(ZMQContext,ZMQ_REQ);
686 
687  // Try to connect to the composition loader. If at first we don't succeed, give the composition loader a little more time to set up the socket.
688  int numTries = 0;
689  while (zmq_connect(ZMQLoaderControl,ZMQLoaderControlURL.c_str()))
690  {
691  if (++numTries == 1000)
692  throw VuoException("The composition couldn't start because the runner couldn't establish communication with the composition loader : " + string(strerror(errno)));
693  usleep(USEC_PER_SEC / 1000);
694  }
695 
696  replaceComposition(dylibPath, "");
697  }
698  else
699  {
700  __block string errorMessage;
701  dispatch_sync(controlQueue, ^{
702  try {
703  setUpConnections();
704  } catch (VuoException &e) {
705  errorMessage = e.what();
706  }
707  });
708  if (! errorMessage.empty())
709  throw VuoException(errorMessage);
710  }
711 }
712 
717 void *VuoRunner_listen(void *context)
718 {
719  pthread_detach(pthread_self());
720  VuoRunner *runner = static_cast<VuoRunner *>(context);
721  runner->listen();
722  return NULL;
723 }
724 
730 void VuoRunner::setUpConnections(void)
731 {
732  ZMQControl = zmq_socket(ZMQContext,ZMQ_REQ);
734 
735  // Try to connect to the composition. If at first we don't succeed, give the composition a little more time to set up the socket.
736  int numTries = 0;
737  while (zmq_connect(ZMQControl,ZMQControlURL.c_str()))
738  {
739  if (++numTries == 1000)
740  throw VuoException("The composition couldn't start because the runner couldn't establish communication to control the composition : " + string(strerror(errno)));
741  usleep(USEC_PER_SEC / 1000);
742  }
743 
744  // Cache published ports so they're available whenever a caller starts listening for published port value changes.
745  arePublishedInputPortsCached = false;
746  arePublishedOutputPortsCached = false;
747  if (isInCurrentProcess())
748  {
749  dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
750  __block string publishedPortsError;
751  dispatch_async(queue, ^{
752  try {
753  getCachedPublishedPorts(false);
754  getCachedPublishedPorts(true);
755  } catch (VuoException &e) {
756  publishedPortsError = e.what();
757  }
758  });
759  while (! (arePublishedInputPortsCached && arePublishedOutputPortsCached) )
760  {
762  usleep(USEC_PER_SEC / 1000);
763 
764  if (! publishedPortsError.empty())
765  throw VuoException(publishedPortsError);
766  }
767  }
768  else
769  {
770  getCachedPublishedPorts(false);
771  getCachedPublishedPorts(true);
772  }
773 
774  listenCanceled = false;
775  dispatch_semaphore_wait(endedListeningSemaphore, DISPATCH_TIME_FOREVER);
776 
777  pthread_t listenThread;
778  int ret = pthread_create(&listenThread, nullptr, &VuoRunner_listen, this);
779  if (ret)
780  throw VuoException(string("The composition couldn't start because the runner couldn't create a thread: ") + strerror(ret));
781 
782  dispatch_semaphore_wait(beganListeningSemaphore, DISPATCH_TIME_FOREVER);
783  if (!listenError.empty())
784  throw VuoException("The composition couldn't start because the runner couldn't establish communication to listen to the composition: " + listenError);
785 
788 }
789 
805 {
806  if (! isInCurrentProcess())
807  throw VuoException("The composition is not running in the current process. Only use this function if the composition was constructed with newCurrentProcessRunnerFromDynamicLibrary().");
808 
809  if (! isMainThread())
810  throw VuoException("This is not the main thread. Only call this function from the main thread.");
811 
812  while (! stopped)
813  VuoEventLoop_processEvent(VuoEventLoop_WaitIndefinitely);
814 }
815 
841 {
842  if (! isInCurrentProcess())
843  throw VuoException("The composition is not running in the current process. Only use this function if the composition was constructed with newCurrentProcessRunnerFromDynamicLibrary().");
844 
845  if (! isMainThread())
846  throw VuoException("This is not the main thread. Only call this function from the main thread.");
847 
848  VuoEventLoop_processEvent(VuoEventLoop_RunOnce);
849 }
850 
859 {
860  dispatch_sync(controlQueue, ^{
861  if (stopped || lostContact) {
862  return;
863  }
864 
866 
867  try
868  {
871  }
872  catch (VuoException &e)
873  {
874  stopBecauseLostContact(e.what());
875  }
876 
877  paused = true;
878  });
879 }
880 
887 {
888  dispatch_sync(controlQueue, ^{
889  if (stopped || lostContact) {
890  return;
891  }
892 
894 
895  try
896  {
899  }
900  catch (VuoException &e)
901  {
902  stopBecauseLostContact(e.what());
903  }
904 
905  paused = false;
906  });
907 }
908 
928 void VuoRunner::replaceComposition(string compositionDylibPath, string compositionDiff)
929 {
930  if (! isUsingCompositionLoader())
931  throw VuoException("The runner is not using a composition loader. Only use this function if the composition was constructed with newSeparateProcessRunnerFromDynamicLibrary().");
932 
933  dispatch_sync(controlQueue, ^{
934  if (stopped || lostContact) {
935  return;
936  }
937 
938  VDebugLog("Loading composition…");
939 
940  if (dylibPath != compositionDylibPath)
941  {
942  if (shouldDeleteBinariesWhenFinished)
943  {
944  remove(dylibPath.c_str());
945  }
946 
947  dylibPath = compositionDylibPath;
948  }
949 
951 
952  try
953  {
954  if (! paused)
955  {
956  VDebugLog(" Pausing…");
959  }
960 
961  cleanUpConnections();
962 
963  vector<string> dependencyDylibPathsRemoved = dependencyLibraries->dequeueLibrariesToUnload();
964  vector<string> dependencyDylibPathsAdded = dependencyLibraries->dequeueLibrariesToLoad();
965 
966  unsigned int messageCount = 4 + dependencyDylibPathsAdded.size() + dependencyDylibPathsRemoved.size();
967  zmq_msg_t *messages = (zmq_msg_t *)malloc(messageCount * sizeof(zmq_msg_t));
968  int index = 0;
969 
970  vuoInitMessageWithString(&messages[index++], dylibPath.c_str());
971 
972  vuoInitMessageWithInt(&messages[index++], dependencyDylibPathsAdded.size());
973  for (vector<string>::iterator i = dependencyDylibPathsAdded.begin(); i != dependencyDylibPathsAdded.end(); ++i) {
974  vuoInitMessageWithString(&messages[index++], (*i).c_str());
975  }
976 
977  vuoInitMessageWithInt(&messages[index++], dependencyDylibPathsRemoved.size());
978  for (vector<string>::iterator i = dependencyDylibPathsRemoved.begin(); i != dependencyDylibPathsRemoved.end(); ++i) {
979  vuoInitMessageWithString(&messages[index++], (*i).c_str());
980  }
981 
982  vuoInitMessageWithString(&messages[index], compositionDiff.c_str());
983 
984  if (! paused)
985  VDebugLog(" Replacing composition…");
986 
987  vuoLoaderControlRequestSend(VuoLoaderControlRequestCompositionReplace,messages,messageCount);
988  vuoLoaderControlReplyReceive(VuoLoaderControlReplyCompositionReplaced);
989 
990  setUpConnections();
991 
992  if (! paused)
993  {
994  VDebugLog(" Unpausing…");
997  }
998 
999  VDebugLog(" Done.");
1000  }
1001  catch (VuoException &e)
1002  {
1003  stopBecauseLostContact(e.what());
1004  }
1005  });
1006 }
1007 
1022 {
1023  dispatch_sync(controlQueue, ^{
1024  if (stopped) {
1025  return;
1026  }
1027 
1028  vuoMemoryBarrier();
1029 
1030  // Only tell the composition to stop if it hasn't already ended on its own.
1031  if (! lostContact)
1032  {
1033  try
1034  {
1035  int timeoutInSeconds = (isInCurrentProcess() ? -1 : 5);
1036  zmq_msg_t messages[3];
1037  vuoInitMessageWithInt(&messages[0], timeoutInSeconds);
1038  vuoInitMessageWithBool(&messages[1], false); // isBeingReplaced
1039  vuoInitMessageWithBool(&messages[2], !isInCurrentProcess()); // isLastEverInProcess
1041 
1042  if (isInCurrentProcess() && isMainThread())
1043  {
1044  // If VuoRunner::stop() is blocking the main thread, wait for the composition's reply on another thread, and drain the main queue, in case the composition needs to shut down stuff that requires the main queue.
1045  __block bool replyReceived = false;
1046  dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
1047  vuoMemoryBarrier();
1048  try
1049  {
1051  }
1052  catch (...)
1053  {
1054  // do nothing; doesn't matter if connection timed out
1055  }
1056  replyReceived = true;
1057  });
1058  while (!replyReceived)
1059  {
1061  usleep(USEC_PER_SEC / 1000);
1062  }
1063  vuoMemoryBarrier();
1064  }
1065  else
1067  }
1068  catch (...)
1069  {
1070  // do nothing; doesn't matter if connection timed out
1071  }
1072  }
1073 
1074  cleanUpConnections();
1075 
1076  if (isUsingCompositionLoader() && ZMQLoaderControl)
1077  {
1078  zmq_close(ZMQLoaderControl);
1079  ZMQLoaderControl = NULL;
1080  }
1081 
1082  if (isInCurrentProcess() && dylibHandle)
1083  {
1084  VuoInitInProcessType *vuoInitInProcess = (VuoInitInProcessType *)dlsym(dylibHandle, "vuoInitInProcess");
1085  if (vuoInitInProcess) // Avoid double jeopardy if startInternal() already failed for missing vuoInitInProcess.
1086  {
1087  VuoFiniType *vuoFini = (VuoFiniType *)dlsym(dylibHandle, "vuoFini");
1088  if (! vuoFini)
1089  {
1090  VUserLog("The composition couldn't stop because vuoFini() couldn't be found in '%s' : %s", dylibPath.c_str(), dlerror());
1091  return;
1092  }
1093  void *runtimeState = vuoFini();
1094 
1096  if (! vuoFiniRuntimeState)
1097  {
1098  VUserLog("The composition couldn't stop because vuoFiniRuntimeState() couldn't be found in '%s' : %s", dylibPath.c_str(), dlerror());
1099  return;
1100  }
1102  }
1103 
1104  dlclose(dylibHandle);
1105  dylibHandle = NULL;
1106  }
1107  else if (isInCurrentProcess() && !dylibHandle)
1108  {
1109  // If the dylib isn't open, the composition isn't running, so there's nothing to clean up.
1110  }
1111  else
1112  {
1113  char buf[1];
1114  close(runnerReadCompositionWritePipe[1]);
1115 
1116  if (! lostContact)
1117  {
1118  // Wait for child process to end.
1119  // Can't use waitpid() since it only waits on child processes, yet compositionPid is a grandchild.
1120  // Instead, do a blocking read() — the grandchild never writes anything to the pipe, and when the grandchild exits,
1121  // read() will return EOF (since it was the last process that had it open for writing).
1122  read(runnerReadCompositionWritePipe[0], &buf, 1);
1123  }
1124 
1125  close(runnerReadCompositionWritePipe[0]);
1126 
1127  if (! lostContact)
1128  {
1129  zmq_term(ZMQContext);
1130  ZMQContext = NULL;
1131  }
1132  else
1133  {
1134  dispatch_semaphore_wait(terminatedZMQContextSemaphore, DISPATCH_TIME_FOREVER);
1135  }
1136  }
1137 
1138  if (shouldDeleteBinariesWhenFinished)
1139  {
1140  if (isUsingCompositionLoader())
1141  {
1142  remove(dylibPath.c_str());
1143  }
1144  else if (isInCurrentProcess())
1145  {
1146  remove(dylibPath.c_str());
1147  }
1148  else
1149  {
1150  remove(executablePath.c_str());
1151  }
1152  }
1153 
1154  dependencyLibraries = nullptr; // release shared_ptr
1155 
1156  stopped = true;
1157  dispatch_semaphore_signal(stoppedSemaphore);
1159  });
1160 }
1161 
1165 void VuoRunner::cleanUpConnections(void)
1166 {
1167  if (! ZMQControl)
1168  return;
1169 
1170  zmq_close(ZMQControl);
1171  ZMQControl = NULL;
1172 
1173  if (ZMQSelfSend)
1174  // Break out of zmq_poll().
1175  vuoSend("VuoRunner::ZMQSelfSend", ZMQSelfSend, 0, nullptr, 0, false, nullptr);
1176 
1177  dispatch_semaphore_wait(endedListeningSemaphore, DISPATCH_TIME_FOREVER);
1178  dispatch_semaphore_signal(endedListeningSemaphore);
1179 }
1180 
1187 {
1188  dispatch_retain(stoppedSemaphore);
1189  dispatch_semaphore_wait(stoppedSemaphore, DISPATCH_TIME_FOREVER);
1190  dispatch_semaphore_signal(stoppedSemaphore);
1191  dispatch_release(stoppedSemaphore);
1192 }
1193 
1209 void VuoRunner::setInputPortValue(string compositionIdentifier, string portIdentifier, json_object *value)
1210 {
1211  const char *valueAsString = json_object_to_json_string_ext(value, JSON_C_TO_STRING_PLAIN);
1212 
1213  dispatch_sync(controlQueue, ^{
1214  if (stopped || lostContact) {
1215  return;
1216  }
1217 
1218  vuoMemoryBarrier();
1219 
1220  try
1221  {
1222  zmq_msg_t messages[3];
1223  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1224  vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1225  vuoInitMessageWithString(&messages[2], valueAsString);
1228  }
1229  catch (VuoException &e)
1230  {
1231  stopBecauseLostContact(e.what());
1232  }
1233  });
1234 }
1235 
1249 void VuoRunner::fireTriggerPortEvent(string compositionIdentifier, string portIdentifier)
1250 {
1251  dispatch_sync(controlQueue, ^{
1252  if (stopped || lostContact) {
1253  return;
1254  }
1255 
1256  vuoMemoryBarrier();
1257 
1258  try
1259  {
1260  zmq_msg_t messages[2];
1261  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1262  vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1265  }
1266  catch (VuoException &e)
1267  {
1268  stopBecauseLostContact(e.what());
1269  }
1270  });
1271 }
1272 
1286 json_object * VuoRunner::getInputPortValue(string compositionIdentifier, string portIdentifier)
1287 {
1288  __block string valueAsString;
1289  dispatch_sync(controlQueue, ^{
1290  if (stopped || lostContact) {
1291  return;
1292  }
1293 
1294  vuoMemoryBarrier();
1295 
1296  try
1297  {
1298  zmq_msg_t messages[3];
1299  vuoInitMessageWithBool(&messages[0], !isInCurrentProcess());
1300  vuoInitMessageWithString(&messages[1], compositionIdentifier.c_str());
1301  vuoInitMessageWithString(&messages[2], portIdentifier.c_str());
1304  valueAsString = receiveString("null");
1305  }
1306  catch (VuoException &e)
1307  {
1308  stopBecauseLostContact(e.what());
1309  }
1310  });
1311  return json_tokener_parse(valueAsString.c_str());
1312 }
1313 
1327 json_object * VuoRunner::getOutputPortValue(string compositionIdentifier, string portIdentifier)
1328 {
1329  __block string valueAsString;
1330  dispatch_sync(controlQueue, ^{
1331  if (stopped || lostContact) {
1332  return;
1333  }
1334 
1335  vuoMemoryBarrier();
1336 
1337  try
1338  {
1339  zmq_msg_t messages[3];
1340  vuoInitMessageWithBool(&messages[0], !isInCurrentProcess());
1341  vuoInitMessageWithString(&messages[1], compositionIdentifier.c_str());
1342  vuoInitMessageWithString(&messages[2], portIdentifier.c_str());
1345  valueAsString = receiveString("null");
1346  }
1347  catch (VuoException &e)
1348  {
1349  stopBecauseLostContact(e.what());
1350  }
1351  });
1352  return json_tokener_parse(valueAsString.c_str());
1353 }
1354 
1368 string VuoRunner::getInputPortSummary(string compositionIdentifier, string portIdentifier)
1369 {
1370  __block string summary;
1371  dispatch_sync(controlQueue, ^{
1372  if (stopped || lostContact) {
1373  return;
1374  }
1375 
1376  vuoMemoryBarrier();
1377 
1378  try
1379  {
1380  zmq_msg_t messages[2];
1381  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1382  vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1385  summary = receiveString("");
1386  }
1387  catch (VuoException &e)
1388  {
1389  stopBecauseLostContact(e.what());
1390  }
1391  });
1392  return summary;
1393 }
1394 
1408 string VuoRunner::getOutputPortSummary(string compositionIdentifier, string portIdentifier)
1409 {
1410  __block string summary;
1411  dispatch_sync(controlQueue, ^{
1412  if (stopped || lostContact) {
1413  return;
1414  }
1415 
1416  vuoMemoryBarrier();
1417 
1418  try
1419  {
1420  zmq_msg_t messages[2];
1421  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1422  vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1425  summary = receiveString("");
1426  }
1427  catch (VuoException &e)
1428  {
1429  stopBecauseLostContact(e.what());
1430  }
1431  });
1432  return summary;
1433 }
1434 
1448 string VuoRunner::subscribeToInputPortTelemetry(string compositionIdentifier, string portIdentifier)
1449 {
1450  __block string summary;
1451  dispatch_sync(controlQueue, ^{
1452  if (stopped || lostContact) {
1453  return;
1454  }
1455 
1456  vuoMemoryBarrier();
1457 
1458  try
1459  {
1460  zmq_msg_t messages[2];
1461  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1462  vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1465  summary = receiveString("");
1466  }
1467  catch (VuoException &e)
1468  {
1469  stopBecauseLostContact(e.what());
1470  }
1471  });
1472  return summary;
1473 }
1474 
1488 string VuoRunner::subscribeToOutputPortTelemetry(string compositionIdentifier, string portIdentifier)
1489 {
1490  __block string summary;
1491  dispatch_sync(controlQueue, ^{
1492  if (stopped || lostContact) {
1493  return;
1494  }
1495 
1496  vuoMemoryBarrier();
1497 
1498  try
1499  {
1500  zmq_msg_t messages[2];
1501  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1502  vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1505  summary = receiveString("");
1506  }
1507  catch (VuoException &e)
1508  {
1509  stopBecauseLostContact(e.what());
1510  }
1511  });
1512  return summary;
1513 }
1514 
1525 void VuoRunner::unsubscribeFromInputPortTelemetry(string compositionIdentifier, string portIdentifier)
1526 {
1527  dispatch_sync(controlQueue, ^{
1528  if (stopped || lostContact) {
1529  return;
1530  }
1531 
1532  vuoMemoryBarrier();
1533 
1534  try
1535  {
1536  zmq_msg_t messages[2];
1537  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1538  vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1541  }
1542  catch (VuoException &e)
1543  {
1544  stopBecauseLostContact(e.what());
1545  }
1546  });
1547 }
1548 
1559 void VuoRunner::unsubscribeFromOutputPortTelemetry(string compositionIdentifier, string portIdentifier)
1560 {
1561  dispatch_sync(controlQueue, ^{
1562  if (stopped || lostContact) {
1563  return;
1564  }
1565 
1566  vuoMemoryBarrier();
1567 
1568  try
1569  {
1570  zmq_msg_t messages[2];
1571  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1572  vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1575  }
1576  catch (VuoException &e)
1577  {
1578  stopBecauseLostContact(e.what());
1579  }
1580  });
1581 }
1582 
1593 void VuoRunner::subscribeToEventTelemetry(string compositionIdentifier)
1594 {
1595  dispatch_sync(controlQueue, ^{
1596  if (stopped || lostContact) {
1597  return;
1598  }
1599 
1600  vuoMemoryBarrier();
1601 
1602  try
1603  {
1604  zmq_msg_t messages[1];
1605  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1608  }
1609  catch (VuoException &e)
1610  {
1611  stopBecauseLostContact(e.what());
1612  }
1613  });
1614 }
1615 
1628 void VuoRunner::unsubscribeFromEventTelemetry(string compositionIdentifier)
1629 {
1630  dispatch_sync(controlQueue, ^{
1631  if (stopped || lostContact) {
1632  return;
1633  }
1634 
1635  vuoMemoryBarrier();
1636 
1637  try
1638  {
1639  zmq_msg_t messages[1];
1640  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1643  }
1644  catch (VuoException &e)
1645  {
1646  stopBecauseLostContact(e.what());
1647  }
1648  });
1649 }
1650 
1661 void VuoRunner::subscribeToAllTelemetry(string compositionIdentifier)
1662 {
1663  dispatch_sync(controlQueue, ^{
1664  if (stopped || lostContact) {
1665  return;
1666  }
1667 
1668  vuoMemoryBarrier();
1669 
1670  try
1671  {
1672  zmq_msg_t messages[1];
1673  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1676  }
1677  catch (VuoException &e)
1678  {
1679  stopBecauseLostContact(e.what());
1680  }
1681  });
1682 }
1683 
1696 void VuoRunner::unsubscribeFromAllTelemetry(string compositionIdentifier)
1697 {
1698  dispatch_sync(controlQueue, ^{
1699  if (stopped || lostContact) {
1700  return;
1701  }
1702 
1703  vuoMemoryBarrier();
1704 
1705  try
1706  {
1707  zmq_msg_t messages[1];
1708  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1711  }
1712  catch (VuoException &e)
1713  {
1714  stopBecauseLostContact(e.what());
1715  }
1716  });
1717 }
1718 
1732 void VuoRunner::setPublishedInputPortValues(map<Port *, json_object *> portsAndValuesToSet)
1733 {
1735  for (auto i : portsAndValuesToSet)
1736  {
1737  string portName = i.first->getName();
1738  if (portName == "width")
1739  p->lastWidth = json_object_get_int64(i.second);
1740  else if (portName == "height")
1741  p->lastHeight = json_object_get_int64(i.second);
1742  else if (portName == "image" || portName == "startImage")
1743  {
1744  json_object *o;
1745  if (json_object_object_get_ex(i.second, "pixelsWide", &o))
1746  p->lastWidth = json_object_get_int64(o);
1747  if (json_object_object_get_ex(i.second, "pixelsHigh", &o))
1748  p->lastHeight = json_object_get_int64(o);
1749  }
1750  }
1751 
1752  dispatch_sync(controlQueue, ^{
1753  if (stopped || lostContact) {
1754  return;
1755  }
1756 
1757  vuoMemoryBarrier();
1758 
1759  try
1760  {
1761  int messageCount = portsAndValuesToSet.size() * 2;
1762  zmq_msg_t messages[messageCount];
1763 
1764  int i = 0;
1765  for (auto &kv : portsAndValuesToSet)
1766  {
1767  vuoInitMessageWithString(&messages[i++], kv.first->getName().c_str());
1768  vuoInitMessageWithString(&messages[i++], json_object_to_json_string_ext(kv.second, JSON_C_TO_STRING_PLAIN));
1769  }
1770 
1773  }
1774  catch (VuoException &e)
1775  {
1776  stopBecauseLostContact(e.what());
1777  }
1778  });
1779 }
1780 
1789 {
1790  set<VuoRunner::Port *> portAsSet;
1791  portAsSet.insert(port);
1792  firePublishedInputPortEvent(portAsSet);
1793 }
1794 
1805 void VuoRunner::firePublishedInputPortEvent(const set<Port *> &ports)
1806 {
1807  dispatch_sync(controlQueue, ^{
1808  if (stopped || lostContact) {
1809  return;
1810  }
1811 
1812  vuoMemoryBarrier();
1813 
1814  lastFiredEventSignaled = false;
1815 
1816  try
1817  {
1818  size_t messageCount = ports.size() + 1;
1819  zmq_msg_t messages[messageCount];
1820 
1821  vuoInitMessageWithInt(&messages[0], ports.size());
1822  int i = 1;
1823  for (VuoRunner::Port *port : ports) {
1824  vuoInitMessageWithString(&messages[i++], port->getName().c_str());
1825  }
1826 
1829  }
1830  catch (VuoException &e)
1831  {
1832  stopBecauseLostContact(e.what());
1833  }
1834  });
1835 }
1836 
1862 {
1863  saturating_semaphore_wait(lastFiredEventSemaphore, &lastFiredEventSignaled);
1864 }
1865 
1877 {
1878  __block string valueAsString;
1879  dispatch_sync(controlQueue, ^{
1880  if (stopped || lostContact) {
1881  return;
1882  }
1883 
1884  vuoMemoryBarrier();
1885 
1886  try
1887  {
1888  zmq_msg_t messages[2];
1889  vuoInitMessageWithBool(&messages[0], !isInCurrentProcess());
1890  vuoInitMessageWithString(&messages[1], port->getName().c_str());
1893  valueAsString = receiveString("null");
1894  }
1895  catch (VuoException &e)
1896  {
1897  stopBecauseLostContact(e.what());
1898  }
1899  });
1900  return json_tokener_parse(valueAsString.c_str());
1901 }
1902 
1914 {
1915  __block string valueAsString;
1916  dispatch_sync(controlQueue, ^{
1917  if (stopped || lostContact) {
1918  return;
1919  }
1920 
1921  vuoMemoryBarrier();
1922 
1923  try
1924  {
1925  zmq_msg_t messages[2];
1926  vuoInitMessageWithBool(&messages[0], !isInCurrentProcess());
1927  vuoInitMessageWithString(&messages[1], port->getName().c_str());
1930  valueAsString = receiveString("null");
1931  }
1932  catch (VuoException &e)
1933  {
1934  stopBecauseLostContact(e.what());
1935  }
1936  });
1937 
1938  // https://b33p.net/kosada/node/17535
1939  json_object *js = json_tokener_parse(valueAsString.c_str());
1940  if (VuoRunner_isHostVDMX && port->getName() == "outputImage")
1941  {
1942  json_object *o;
1943  uint64_t actualWidth = 0;
1944  if (json_object_object_get_ex(js, "pixelsWide", &o))
1945  actualWidth = json_object_get_int64(o);
1946  uint64_t actualHeight = 0;
1947  if (json_object_object_get_ex(js, "pixelsHigh", &o))
1948  actualHeight = json_object_get_int64(o);
1949 
1950  if (p->lastWidth && p->lastHeight
1951  && (actualWidth != p->lastWidth || actualHeight != p->lastHeight))
1952  {
1953  call_once(p->vuoImageFunctionsInitialized, [=](){
1954  p->vuoImageMakeFromJsonWithDimensions = (Private::vuoImageMakeFromJsonWithDimensionsType)dlsym(RTLD_DEFAULT, "VuoImage_makeFromJsonWithDimensions");
1955  if (!p->vuoImageMakeFromJsonWithDimensions)
1956  {
1957  VUserLog("Error: Couldn't find VuoImage_makeFromJsonWithDimensions.");
1958  return;
1959  }
1960 
1961  p->vuoImageGetJson = (Private::vuoImageGetJsonType)dlsym(RTLD_DEFAULT, "VuoImage_getJson");
1962  if (!p->vuoImageGetJson)
1963  {
1964  VUserLog("Error: Couldn't find VuoImage_getJson.");
1965  return;
1966  }
1967  });
1968 
1970  {
1971  void *vi = p->vuoImageMakeFromJsonWithDimensions(js, p->lastWidth, p->lastHeight);
1972  return p->vuoImageGetJson(vi);
1973  }
1974  }
1975  }
1976 
1977  return js;
1978 }
1979 
1994 vector<VuoRunner::Port *> VuoRunner::getCachedPublishedPorts(bool input)
1995 {
1996  // Caching not only provides faster access (without zmq messages),
1997  // but also ensures that the VuoRunner::Port pointers passed to
1998  // VuoRunnerDelegate::receivedTelemetryPublishedOutputPortUpdated are consistent.
1999 
2000  if (input)
2001  {
2002  if (! arePublishedInputPortsCached)
2003  {
2004  publishedInputPorts = refreshPublishedPorts(true);
2005  arePublishedInputPortsCached = true;
2006  }
2007  return publishedInputPorts;
2008  }
2009  else
2010  {
2011  if (! arePublishedOutputPortsCached)
2012  {
2013  publishedOutputPorts = refreshPublishedPorts(false);
2014  arePublishedOutputPortsCached = true;
2015  }
2016  return publishedOutputPorts;
2017  }
2018 }
2019 
2031 vector<VuoRunner::Port *> VuoRunner::refreshPublishedPorts(bool input)
2032 {
2033  dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
2034  dispatch_source_t timeout = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, queue);
2035  dispatch_source_set_timer(timeout, dispatch_time(DISPATCH_TIME_NOW, 5*NSEC_PER_SEC), NSEC_PER_SEC, NSEC_PER_SEC/10);
2036  dispatch_source_set_event_handler(timeout, ^{
2037  stopBecauseLostContact("The connection between the composition and runner timed out when trying to receive the list of published ports");
2038  dispatch_source_cancel(timeout);
2039  });
2040  dispatch_resume(timeout);
2041 
2042  vector<VuoRunner::Port *> ports;
2043 
2044  try
2045  {
2046  vuoMemoryBarrier();
2047 
2048  enum VuoControlRequest requests[4];
2049  enum VuoControlReply replies[4];
2050  if (input)
2051  {
2058  }
2059  else
2060  {
2067  }
2068 
2069  vector<string> names;
2070  vector<string> types;
2071  vector<string> details;
2072 
2073  for (int i = 0; i < 3; ++i)
2074  {
2075  vuoControlRequestSend(requests[i], NULL, 0);
2076  vuoControlReplyReceive(replies[i]);
2077  vector<string> messageStrings = receiveListOfStrings();
2078  if (i == 0)
2079  names = messageStrings;
2080  else if (i == 1)
2081  types = messageStrings;
2082  else
2083  details = messageStrings;
2084  }
2085 
2086  for (size_t i = 0; i < names.size() && i < types.size() && i < details.size(); ++i)
2087  {
2088  VuoRunner::Port *port = new Port(names[i], types[i], json_tokener_parse(details[i].c_str()));
2089  ports.push_back(port);
2090  }
2091  }
2092  catch (...)
2093  {
2094  dispatch_source_cancel(timeout);
2095  dispatch_release(timeout);
2096  throw;
2097  }
2098 
2099  dispatch_source_cancel(timeout);
2100  dispatch_release(timeout);
2101 
2102  return ports;
2103 }
2104 
2114 vector<VuoRunner::Port *> VuoRunner::getPublishedInputPorts(void)
2115 {
2116  return getCachedPublishedPorts(true);
2117 }
2118 
2128 vector<VuoRunner::Port *> VuoRunner::getPublishedOutputPorts(void)
2129 {
2130  return getCachedPublishedPorts(false);
2131 }
2132 
2143 {
2144  vector<VuoRunner::Port *> inputPorts = getPublishedInputPorts();
2145  for (vector<VuoRunner::Port *>::iterator i = inputPorts.begin(); i != inputPorts.end(); ++i)
2146  if ((*i)->getName() == name)
2147  return *i;
2148 
2149  return NULL;
2150 }
2151 
2162 {
2163  vector<VuoRunner::Port *> outputPorts = getPublishedOutputPorts();
2164  for (vector<VuoRunner::Port *>::iterator i = outputPorts.begin(); i != outputPorts.end(); ++i)
2165  if ((*i)->getName() == name)
2166  return *i;
2167 
2168  return NULL;
2169 }
2170 
2182 void VuoRunner::listen()
2183 {
2184  // Name this thread.
2185  {
2186  const char *compositionName = dylibPath.empty() ? executablePath.c_str() : dylibPath.c_str();
2187 
2188  // Trim the path, if present.
2189  if (const char *lastSlash = strrchr(compositionName, '/'))
2190  compositionName = lastSlash + 1;
2191 
2192  char threadName[MAXTHREADNAMESIZE];
2193  snprintf(threadName, MAXTHREADNAMESIZE, "org.vuo.runner.telemetry: %s", compositionName);
2194  pthread_setname_np(threadName);
2195  }
2196 
2197  ZMQSelfReceive = zmq_socket(ZMQContext, ZMQ_PAIR);
2198  VuoRunner_configureSocket(ZMQSelfReceive, -1);
2199  if (zmq_bind(ZMQSelfReceive, "inproc://vuo-runner-self") != 0)
2200  {
2201  listenError = strerror(errno);
2202  dispatch_semaphore_signal(beganListeningSemaphore);
2203  return;
2204  }
2205 
2206  ZMQSelfSend = zmq_socket(ZMQContext, ZMQ_PAIR);
2207  VuoRunner_configureSocket(ZMQSelfSend, -1);
2208  if (zmq_connect(ZMQSelfSend, "inproc://vuo-runner-self") != 0)
2209  {
2210  listenError = strerror(errno);
2211  dispatch_semaphore_signal(beganListeningSemaphore);
2212  return;
2213  }
2214 
2215  {
2216  ZMQTelemetry = zmq_socket(ZMQContext,ZMQ_SUB);
2217  VuoRunner_configureSocket(ZMQTelemetry, -1);
2218  if(zmq_connect(ZMQTelemetry,ZMQTelemetryURL.c_str()))
2219  {
2220  listenError = strerror(errno);
2221  dispatch_semaphore_signal(beganListeningSemaphore);
2222  return;
2223  }
2224 
2225  const int highWaterMark = 0; // no limit
2226  if(zmq_setsockopt(ZMQTelemetry,ZMQ_RCVHWM,&highWaterMark,sizeof(highWaterMark)))
2227  {
2228  listenError = strerror(errno);
2229  dispatch_semaphore_signal(beganListeningSemaphore);
2230  return;
2231  }
2232  }
2233 
2234  {
2235  // subscribe to all types of telemetry
2236  char type = VuoTelemetryStats;
2237  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2239  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2241  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2243  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2245  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2247  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2249  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2250  type = VuoTelemetryEventDropped;
2251  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2252  type = VuoTelemetryError;
2253  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2255  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2256  }
2257 
2258  {
2259  // Wait until the connection is established, as evidenced by a heartbeat telemetry message
2260  // being received from the composition. This is necessary because the ØMQ API doesn't provide
2261  // any way to tell when a SUB socket is ready to receive messages, and if you call zmq_poll()
2262  // on it before it's ready, then it might miss messages that came in while it was still trying
2263  // to get ready. (The zmq_connect() function doesn't make any guarantees about the socket being ready.
2264  // It just starts some setup that may continue asynchronously after zmq_connect() has returned.)
2265  // To avoid missing important telemetry messages from the composition, we make sure that the
2266  // runner doesn't tell the composition to unpause until the runner has verified that it's
2267  // receiving heartbeat telemetry messages. http://zguide.zeromq.org/page:all#Node-Coordination
2268  zmq_pollitem_t items[]=
2269  {
2270  {ZMQTelemetry,0,ZMQ_POLLIN,0},
2271  };
2272  int itemCount = 1;
2273  long timeout = -1;
2274  zmq_poll(items,itemCount,timeout);
2275  }
2276 
2277  dispatch_semaphore_signal(beganListeningSemaphore);
2278 
2279  bool pendingCancel = false;
2280  while(! listenCanceled)
2281  {
2282  zmq_pollitem_t items[]=
2283  {
2284  {ZMQTelemetry,0,ZMQ_POLLIN,0},
2285  {ZMQSelfReceive,0,ZMQ_POLLIN,0},
2286  };
2287  int itemCount = 2;
2288 
2289  // Wait 1 second. If no telemetry was received in that second, we probably lost contact with the composition.
2290  long timeout = pendingCancel ? 100 : 1000;
2291  zmq_poll(items,itemCount,timeout);
2292  if(items[0].revents & ZMQ_POLLIN)
2293  {
2294  // Receive telemetry type.
2295  char type = vuoReceiveInt(ZMQTelemetry, NULL);
2296 
2297  // Receive telemetry arguments and forward to VuoRunnerDelegate.
2298  switch (type)
2299  {
2300  case VuoTelemetryStats:
2301  {
2302  unsigned long utime = vuoReceiveUnsignedInt64(ZMQTelemetry, NULL);
2303  unsigned long stime = vuoReceiveUnsignedInt64(ZMQTelemetry, NULL);
2304  dispatch_sync(delegateQueue, ^{
2305  if (delegate)
2306  delegate->receivedTelemetryStats(utime, stime);
2307  });
2308  break;
2309  }
2311  {
2312  char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2313  char *nodeIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2314  dispatch_sync(delegateQueue, ^{
2315  if (delegate)
2316  delegate->receivedTelemetryNodeExecutionStarted(compositionIdentifier, nodeIdentifier);
2317  });
2318  free(compositionIdentifier);
2319  free(nodeIdentifier);
2320  break;
2321  }
2323  {
2324  char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2325  char *nodeIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2326  dispatch_sync(delegateQueue, ^{
2327  if (delegate)
2328  delegate->receivedTelemetryNodeExecutionFinished(compositionIdentifier, nodeIdentifier);
2329  });
2330  free(compositionIdentifier);
2331  free(nodeIdentifier);
2332  break;
2333  }
2335  {
2336  while (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2337  {
2338  char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2339  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2340  {
2341  char *portIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2342  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2343  {
2344  bool receivedEvent = vuoReceiveBool(ZMQTelemetry, NULL);
2345  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2346  {
2347  bool receivedData = vuoReceiveBool(ZMQTelemetry, NULL);
2348  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2349  {
2350  string portDataSummary;
2351  char *s = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2352  if (s)
2353  {
2354  portDataSummary = s;
2355  free(s);
2356  }
2357  else
2358  portDataSummary = "";
2359 
2360  dispatch_sync(delegateQueue, ^{
2361  if (delegate)
2362  delegate->receivedTelemetryInputPortUpdated(compositionIdentifier, portIdentifier, receivedEvent, receivedData, portDataSummary);
2363  });
2364  }
2365  }
2366  }
2367  free(portIdentifier);
2368  }
2369  free(compositionIdentifier);
2370  }
2371  break;
2372  }
2374  {
2375  while (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2376  {
2377  char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2378  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2379  {
2380  char *portIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2381  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2382  {
2383  bool sentEvent = vuoReceiveBool(ZMQTelemetry, NULL);
2384  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2385  {
2386  bool sentData = vuoReceiveBool(ZMQTelemetry, NULL);
2387  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2388  {
2389  string portDataSummary;
2390  char *s = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2391  if (s)
2392  {
2393  portDataSummary = s;
2394  free(s);
2395  }
2396  else
2397  portDataSummary = "";
2398 
2399  dispatch_sync(delegateQueue, ^{
2400  if (delegate)
2401  delegate->receivedTelemetryOutputPortUpdated(compositionIdentifier, portIdentifier, sentEvent, sentData, portDataSummary);
2402  });
2403  }
2404  }
2405  }
2406  free(portIdentifier);
2407  }
2408  free(compositionIdentifier);
2409  }
2410  break;
2411  }
2413  {
2414  while (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2415  {
2416  char *portIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2417  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2418  {
2419  bool sentData = vuoReceiveBool(ZMQTelemetry, NULL);
2420  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2421  {
2422  string portDataSummary;
2423  char *s = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2424  if (s)
2425  {
2426  portDataSummary = s;
2427  free(s);
2428  }
2429  else
2430  portDataSummary = "";
2431 
2432  Port *port = getPublishedOutputPortWithName(portIdentifier);
2433 
2434  dispatch_sync(delegateQueue, ^{
2435  if (delegate)
2436  delegate->receivedTelemetryPublishedOutputPortUpdated(port, sentData, portDataSummary);
2437  });
2438  }
2439  }
2440  free(portIdentifier);
2441  }
2442  break;
2443  }
2445  {
2446  saturating_semaphore_signal(lastFiredEventSemaphore, &lastFiredEventSignaled);
2447  break;
2448  }
2450  {
2451  char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2452  char *portIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2453  dispatch_sync(delegateQueue, ^{
2454  if (delegate)
2455  delegate->receivedTelemetryEventDropped(compositionIdentifier, portIdentifier);
2456  });
2457  free(compositionIdentifier);
2458  free(portIdentifier);
2459  break;
2460  }
2461  case VuoTelemetryError:
2462  {
2463  char *message = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2464  dispatch_sync(delegateQueue, ^{
2465  if (delegate)
2466  delegate->receivedTelemetryError( string(message) );
2467  });
2468  free(message);
2469  break;
2470  }
2472  {
2473  dispatch_sync(delegateQueue, ^{
2474  if (delegate)
2475  delegate->lostContactWithComposition();
2476  });
2477  dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2478  stop();
2479  });
2480  break;
2481  }
2482  default:
2483  VUserLog("Error: Unknown telemetry message type: %d", type);
2484  break;
2485  }
2486  }
2487  else if (! listenCanceled) // Either the 1-second timeout elapsed, or we got a stop-listening message from ZMQSelfSend
2488  {
2489  if (items[1].revents & ZMQ_POLLIN)
2490  {
2491  // This is a stop-listening message.
2492  vuoReceiveInt(ZMQSelfReceive, NULL);
2493 
2494  // Drain any remaining telemetry messages.
2495  pendingCancel = true;
2496  }
2497 
2498  else if (pendingCancel)
2499  listenCanceled = true;
2500 
2501  else
2502  {
2503  // Timeout.
2504  // Could happen if the composition crashed, or if the system fell asleep then hibernated (standby mode).
2505  // If it's a crash we should disconnect; if it's hibernation we should ignore the timeout and try zmq_poll again.
2507  VDebugLog("zmq_poll timed out, but system is sleeping so I'll try again.");
2508  else if (VuoLog_isDebuggerAttached())
2509  VDebugLog("zmq_poll timed out, but a debugger is attached to the host so I'll try again.");
2510  else
2511  {
2512  listenCanceled = true;
2513  string dir, file, ext;
2514  VuoFileUtilities::splitPath(executablePath, dir, file, ext);
2515  stopBecauseLostContact("The connection between the composition ('" + file + "') and runner timed out while listening for telemetry.");
2516  }
2517  }
2518  }
2519  }
2520 
2521  zmq_close(ZMQTelemetry);
2522  ZMQTelemetry = NULL;
2523 
2524  zmq_close(ZMQSelfSend);
2525  ZMQSelfSend = NULL;
2526  zmq_close(ZMQSelfReceive);
2527  ZMQSelfReceive = NULL;
2528 
2529  dispatch_semaphore_signal(endedListeningSemaphore);
2530  return;
2531 }
2532 
2542 void VuoRunner::vuoControlRequestSend(enum VuoControlRequest request, zmq_msg_t *messages, unsigned int messageCount)
2543 {
2544  char *error = NULL;
2545  vuoSend("runner VuoControl",ZMQControl,request,messages,messageCount,false,&error);
2546 
2547  if (error)
2548  {
2549  string e(error);
2550  free(error);
2551  throw VuoException(e);
2552  }
2553 }
2554 
2564 void VuoRunner::vuoLoaderControlRequestSend(enum VuoLoaderControlRequest request, zmq_msg_t *messages, unsigned int messageCount)
2565 {
2566  char *error = NULL;
2567  vuoSend("runner VuoLoaderControl",ZMQLoaderControl,request,messages,messageCount,false,&error);
2568 
2569  if (error)
2570  {
2571  string e(error);
2572  free(error);
2573  throw VuoException(e);
2574  }
2575 }
2576 
2585 void VuoRunner::vuoControlReplyReceive(enum VuoControlReply expectedReply)
2586 {
2587  char *error = NULL;
2588  int reply = vuoReceiveInt(ZMQControl, &error);
2589 
2590  if (error)
2591  {
2592  string e(error);
2593  free(error);
2594  ostringstream oss;
2595  oss << e << " (expected " << expectedReply << ")";
2596  throw VuoException(oss.str());
2597  }
2598  else if (reply != expectedReply)
2599  {
2600  ostringstream oss;
2601  oss << "The runner received the wrong message from the composition (expected " << expectedReply << ", received " << reply << ")";
2602  throw VuoException(oss.str());
2603  }
2604 }
2605 
2614 void VuoRunner::vuoLoaderControlReplyReceive(enum VuoLoaderControlReply expectedReply)
2615 {
2616  char *error = NULL;
2617  int reply = vuoReceiveInt(ZMQLoaderControl, &error);
2618 
2619  if (error)
2620  {
2621  string e(error);
2622  free(error);
2623  ostringstream oss;
2624  oss << e << " (expected " << expectedReply << ")";
2625  throw VuoException(oss.str());
2626  }
2627  else if (reply != expectedReply)
2628  {
2629  ostringstream oss;
2630  oss << "The runner received the wrong message from the composition loader (expected " << expectedReply << ", received " << reply << ")";
2631  throw VuoException(oss.str());
2632  }
2633 }
2634 
2640 string VuoRunner::receiveString(string fallbackIfNull)
2641 {
2642  char *error = NULL;
2643  char *s = vuoReceiveAndCopyString(ZMQControl, &error);
2644 
2645  if (error)
2646  {
2647  string e(error);
2648  free(error);
2649  throw VuoException(e);
2650  }
2651 
2652  string ret;
2653  if (s)
2654  {
2655  ret = s;
2656  free(s);
2657  }
2658  else
2659  ret = fallbackIfNull;
2660 
2661  return ret;
2662 }
2663 
2667 vector<string> VuoRunner::receiveListOfStrings(void)
2668 {
2669  vector<string> messageStrings;
2671  {
2672  string s = receiveString("");
2673  messageStrings.push_back(s);
2674  }
2675  return messageStrings;
2676 }
2677 
2683 void VuoRunner::saturating_semaphore_signal(dispatch_semaphore_t dsema, bool *signaled)
2684 {
2685  if (__sync_bool_compare_and_swap(signaled, false, true))
2686  dispatch_semaphore_signal(dsema);
2687 }
2688 
2694 void VuoRunner::saturating_semaphore_wait(dispatch_semaphore_t dsema, bool *signaled)
2695 {
2696  *signaled = false;
2697  dispatch_semaphore_wait(dsema, DISPATCH_TIME_FOREVER);
2698 }
2699 
2704 {
2705  return stopped;
2706 }
2707 
2711 bool VuoRunner::isInCurrentProcess(void)
2712 {
2713  return executablePath.empty();
2714 }
2715 
2720 bool VuoRunner::isUsingCompositionLoader(void)
2721 {
2722  return ! executablePath.empty() && ! dylibPath.empty();
2723 }
2724 
2729 {
2730  dispatch_sync(delegateQueue, ^{
2731  this->delegate = delegate;
2732  });
2733 }
2734 
2738 void VuoRunner::stopBecauseLostContact(string errorMessage)
2739 {
2740  __block bool alreadyLostContact;
2741  dispatch_sync(delegateQueue, ^{
2742  alreadyLostContact = lostContact;
2743  lostContact = true;
2744  });
2745 
2746  if (alreadyLostContact)
2747  return;
2748 
2749  saturating_semaphore_signal(lastFiredEventSemaphore, &lastFiredEventSignaled);
2750 
2751  dispatch_sync(delegateQueue, ^{
2752  if (delegate)
2753  delegate->lostContactWithComposition();
2754  });
2755 
2756  dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2757  stop();
2758  });
2759 
2760  if (! isInCurrentProcess())
2761  {
2762  // Normally, stop() is responsible for terminating the ZMQ context.
2763  // But, if stopBecauseLostContact() is called, it takes the responsibility away from stop().
2764  // If there's an in-progress zmq_recv() call, stop() will get stuck waiting on controlQueue, so
2765  // the below call to terminate the ZMQ context interrupts zmq_recv() and allows stop() to proceed.
2766  dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2767  vuoMemoryBarrier();
2768 
2769  zmq_term(ZMQContext);
2770  ZMQContext = NULL;
2771  dispatch_semaphore_signal(terminatedZMQContextSemaphore);
2772  });
2773  }
2774 
2775  VUserLog("%s", errorMessage.c_str());
2776 }
2777 
2784 {
2785  return compositionPid;
2786 }
2787 
2795 VuoRunner::Port::Port(string name, string type, json_object *details)
2796 {
2797  this->name = name;
2798  this->type = type;
2799  this->details = details;
2800 }
2801 
2806 {
2807  return name;
2808 }
2809 
2814 {
2815  return type;
2816 }
2817 
2846 {
2847  return details;
2848 }
2849 
2850 VuoRunnerDelegate::~VuoRunnerDelegate() { } // Fixes "undefined symbols" error