Vuo  2.0.0
VuoRunner.cc
Go to the documentation of this file.
1 
10 #include "VuoRunner.hh"
11 #include "VuoFileUtilities.hh"
12 #include "VuoStringUtilities.hh"
13 #include "VuoEventLoop.h"
14 #include "VuoException.hh"
15 #include "VuoRuntime.h"
17 
18 #include <CoreServices/CoreServices.h>
19 #include <pthread.h>
20 #include <stdio.h>
21 #include <dlfcn.h>
22 #include <sstream>
23 #include <copyfile.h>
24 #include <mach-o/loader.h>
25 #include <sys/proc_info.h>
26 #include <sys/stat.h>
27 
28 void *VuoApp_mainThread = NULL;
29 static const char *mainThreadChecker = "/Applications/Xcode.app/Contents/Developer/usr/lib/libMainThreadChecker.dylib";
31 
36 static void VuoRunner_closeOnExec(int fd)
37 {
38  int flags = fcntl(fd, F_GETFD);
39  if (flags < 0)
40  {
41  VUserLog("Error: Couldn't get flags for desciptor %d: %s", fd, strerror(errno));
42  return;
43  }
44 
45  flags |= FD_CLOEXEC;
46 
47  if (fcntl(fd, F_SETFD, flags) != 0)
48  VUserLog("Error: Couldn't set FD_CLOEXEC on descriptor %d: %s", fd, strerror(errno));
49 }
50 
54 static void __attribute__((constructor)) VuoRunner_init()
55 {
56  VuoApp_mainThread = (void *)pthread_self();
57 
58 #pragma clang diagnostic push
59 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
60  // Calls _TSGetMainThread().
61  // https://b33p.net/kosada/node/12944
62  YieldToAnyThread();
63 #pragma clang diagnostic pop
64 
66 
67  // Ensure that the write end of this pipe gets closed upon fork()/exec(),
68  // so child processes don't prop open this pipe,
69  // which would prevent Vuo compositions from quitting when the VuoRunner process quits.
71 }
72 
76 static bool isMainThread(void)
77 {
78  return VuoApp_mainThread == (void *)pthread_self();
79 }
80 
84 static void VuoRunner_configureSocket(void *zmqSocket, int timeoutInSeconds)
85 {
86  if (timeoutInSeconds >= 0)
87  {
88  int timeoutInMilliseconds = timeoutInSeconds * 1000;
89  zmq_setsockopt(zmqSocket, ZMQ_RCVTIMEO, &timeoutInMilliseconds, sizeof timeoutInMilliseconds);
90  zmq_setsockopt(zmqSocket, ZMQ_SNDTIMEO, &timeoutInMilliseconds, sizeof timeoutInMilliseconds);
91  }
92 
93  int linger = 0; // avoid having zmq_term block if the runner has tried to send a message on a broken connection
94  zmq_setsockopt(zmqSocket, ZMQ_LINGER, &linger, sizeof linger);
95 }
96 
107 VuoRunner * VuoRunner::newSeparateProcessRunnerFromExecutable(string executablePath, string sourceDir,
108  bool continueIfRunnerDies, bool deleteExecutableWhenFinished)
109 {
110  VuoRunner * vr = new VuoRunner();
111  vr->executablePath = executablePath;
112  vr->shouldContinueIfRunnerDies = continueIfRunnerDies;
113  vr->shouldDeleteBinariesWhenFinished = deleteExecutableWhenFinished;
114  vr->sourceDir = sourceDir;
115  return vr;
116 }
117 
133 VuoRunner * VuoRunner::newSeparateProcessRunnerFromDynamicLibrary(string compositionLoaderPath, string compositionDylibPath,
134  VuoRunningCompositionLibraries *runningCompositionLibraries,
135  string sourceDir, bool continueIfRunnerDies, bool deleteDylibsWhenFinished)
136 {
137  VuoRunner * vr = new VuoRunner();
138  vr->executablePath = compositionLoaderPath;
139  vr->dylibPath = compositionDylibPath;
140  vr->dependencyLibraries = runningCompositionLibraries;
141  vr->sourceDir = sourceDir;
142  vr->shouldContinueIfRunnerDies = continueIfRunnerDies;
143  vr->shouldDeleteBinariesWhenFinished = deleteDylibsWhenFinished;
144  runningCompositionLibraries->setDeleteResourceLibraries(deleteDylibsWhenFinished);
145  return vr;
146 }
147 
158  bool deleteDylibWhenFinished)
159 {
160  VuoRunner * vr = new VuoRunner();
161  vr->dylibPath = dylibPath;
162  vr->shouldDeleteBinariesWhenFinished = deleteDylibWhenFinished;
163  vr->sourceDir = sourceDir;
164  return vr;
165 }
166 
173 {
174  dispatch_release(stoppedSemaphore);
175  dispatch_release(terminatedZMQContextSemaphore);
176  dispatch_release(beganListeningSemaphore);
177  dispatch_release(endedListeningSemaphore);
178  dispatch_release(lastFiredEventSemaphore);
179  dispatch_release(delegateQueue);
180 }
181 
186 void VuoRunner::setRuntimeChecking(bool runtimeCheckingEnabled)
187 {
188  if (!stopped)
189  {
190  VUserLog("Error: Only call VuoRunner::setRuntimeChecking() prior to starting the composition.");
191  return;
192  }
193 
194  isRuntimeCheckingEnabled = runtimeCheckingEnabled && VuoFileUtilities::fileExists(mainThreadChecker);
195 }
196 
200 VuoRunner::VuoRunner(void)
201 {
202  dylibHandle = NULL;
203  dependencyLibraries = NULL;
204  shouldContinueIfRunnerDies = false;
205  shouldDeleteBinariesWhenFinished = false;
206  isRuntimeCheckingEnabled = false;
207  paused = true;
208  stopped = true;
209  lostContact = false;
210  listenCanceled = false;
211  stoppedSemaphore = dispatch_semaphore_create(1);
212  terminatedZMQContextSemaphore = dispatch_semaphore_create(0);
213  beganListeningSemaphore = dispatch_semaphore_create(0);
214  endedListeningSemaphore = dispatch_semaphore_create(1);
215  lastFiredEventSemaphore = dispatch_semaphore_create(0);
216  lastFiredEventSignaled = false;
217  controlQueue = dispatch_queue_create("org.vuo.runner.control", NULL);
218  ZMQContext = NULL;
219  ZMQSelfSend = NULL;
220  ZMQSelfReceive = NULL;
221  ZMQControl = NULL;
222  ZMQTelemetry = NULL;
223  ZMQLoaderControl = NULL;
224  delegate = NULL;
225  delegateQueue = dispatch_queue_create("org.vuo.runner.delegate", NULL);
226  arePublishedInputPortsCached = false;
227  arePublishedOutputPortsCached = false;
228 }
229 
246 {
247  try
248  {
249  startInternal();
250 
251  if (isInCurrentProcess())
252  {
253  dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
254  dispatch_async(queue, ^{
255  unpause();
256  });
257  while (paused)
258  {
260  usleep(USEC_PER_SEC / 1000);
261  }
262  }
263  else
264  {
265  unpause();
266  }
267  }
268  catch (VuoException &e)
269  {
270  stopBecauseLostContact(e.what());
271  }
272 }
273 
293 {
294  try
295  {
296  startInternal();
297  }
298  catch (VuoException &e)
299  {
300  stopBecauseLostContact(e.what());
301  }
302 }
303 
308 void VuoRunner::copyDylibAndChangeId(string dylibPath, string &outputDylibPath)
309 {
310  string directory, file, extension;
311  VuoFileUtilities::splitPath(dylibPath, directory, file, extension);
312 
313  const int makeTmpFileExtension = 7;
314  if (file.length() > makeTmpFileExtension)
315  {
316  // makeTmpFile() appends "-XXXXXX"; make room for that.
317  string trimmedFile = file.substr(0, file.length() - makeTmpFileExtension);
318 
319  bool alreadyLoaded;
320  do
321  {
322  outputDylibPath = VuoFileUtilities::makeTmpFile(trimmedFile, "dylib");
323  alreadyLoaded = dlopen(outputDylibPath.c_str(), RTLD_NOLOAD);
324  } while (alreadyLoaded);
325  }
326  else
327  {
328  // For short names, like those generated by VDMX, just replace the entire name with a hash.
329  // https://b33p.net/kosada/node/12917
330  bool alreadyLoaded;
331  do
332  {
333  string hash = VuoStringUtilities::makeRandomHash(file.length());
334  outputDylibPath = "/tmp/" + hash + ".dylib";
335  alreadyLoaded = dlopen(outputDylibPath.c_str(), RTLD_NOLOAD);
336  } while (alreadyLoaded);
337  }
338 
339  string newDirectory, newFile, newExtension;
340  VuoFileUtilities::splitPath(outputDylibPath, newDirectory, newFile, newExtension);
341 
342  if (newFile.length() > file.length())
343  throw VuoException("The composition couldn't start because the uniqued dylib name (" + newFile + ") is longer than the original dylib name (" + file + ").");
344 
345  if (copyfile(dylibPath.c_str(), outputDylibPath.c_str(), NULL, COPYFILE_ALL))
346  throw VuoException("The composition couldn't start because a copy of the dylib couldn't be made.");
347 
348  FILE *fp = fopen(outputDylibPath.c_str(), "r+b");
349  if (!fp)
350  throw VuoException("The composition couldn't start because the dylib's header couldn't be opened.");
351  VuoDefer(^{ fclose(fp); });
352 
353  struct mach_header_64 header;
354  if (fread(&header, sizeof(header), 1, fp) != 1)
355  throw VuoException("The composition couldn't start because the dylib's header couldn't be read.");
356 
357  if (header.magic != MH_MAGIC_64
358  || header.cputype != CPU_TYPE_X86_64)
359  throw VuoException("The composition couldn't start because the dylib isn't an x86_64-only (non-fat) Mach-O binary.");
360 
361  for (int i = 0; i < header.ncmds; ++i)
362  {
363  struct load_command lc;
364  if (fread(&lc, sizeof(lc), 1, fp) != 1)
365  throw VuoException("The composition couldn't start because the dylib's command couldn't be read.");
366 
367  // VLog("cmd[%d]: %x (size %d)",i,lc.cmd,lc.cmdsize);
368  if (lc.cmd == LC_ID_DYLIB)
369  {
370  fseek(fp, sizeof(struct dylib), SEEK_CUR);
371 
372  size_t nameLength = lc.cmdsize - sizeof(struct dylib_command);
373  char *name = (char *)calloc(nameLength + 1, 1);
374  if (fread(name, nameLength, 1, fp) != 1)
375  throw VuoException("The composition couldn't start because the dylib's ID command couldn't be read.");
376 
377 // VLog("Changing name \"%s\" to \"%s\"…", name, outputDylibPath.c_str());
378  fseek(fp, -nameLength, SEEK_CUR);
379  bzero(name, nameLength);
380  memcpy(name, outputDylibPath.c_str(), min(nameLength, outputDylibPath.length()));
381  fwrite(name, nameLength, 1, fp);
382  return;
383  }
384  else
385  fseek(fp, lc.cmdsize-sizeof(lc), SEEK_CUR);
386  }
387 
388  throw VuoException("The composition couldn't start because the dylib's LC_ID_DYLIB command couldn't be found.");
389 }
390 
400 void VuoRunner::startInternal(void)
401 {
402  stopped = false;
403  dispatch_semaphore_wait(stoppedSemaphore, DISPATCH_TIME_FOREVER);
404 
405  ZMQContext = zmq_init(1);
406 
407  if (isInCurrentProcess())
408  {
409  // Start the composition in the current process.
410 
411  bool alreadyLoaded = dlopen(dylibPath.c_str(), RTLD_NOLOAD);
412  if (alreadyLoaded)
413  {
414  // Each composition instance needs its own global variables.
415  // Change the dylib's internal name, to convince dlopen() to load another instance of it.
416 
417  string uniquedDylibPath;
418  copyDylibAndChangeId(dylibPath, uniquedDylibPath);
419  VDebugLog("\"%s\" is already loaded, so I duplicated it and changed its LC_ID_DYLIB to \"%s\".", dylibPath.c_str(), uniquedDylibPath.c_str());
420 
421  if (shouldDeleteBinariesWhenFinished)
422  remove(dylibPath.c_str());
423 
424  dylibPath = uniquedDylibPath;
425  shouldDeleteBinariesWhenFinished = true;
426  }
427 
428  dylibHandle = dlopen(dylibPath.c_str(), RTLD_NOW);
429  if (!dylibHandle)
430  throw VuoException("The composition couldn't start because the library '" + dylibPath + "' couldn't be loaded : " + dlerror());
431 
432  try
433  {
435  if (! vuoInitInProcess)
436  throw VuoException("The composition couldn't start because vuoInitInProcess() couldn't be found in '" + dylibPath + "' : " + dlerror());
437 
438  ZMQControlURL = "inproc://" + VuoFileUtilities::makeTmpFile("vuo-control", "");
439  ZMQTelemetryURL = "inproc://" + VuoFileUtilities::makeTmpFile("vuo-telemetry", "");
440 
441  vuoInitInProcess(ZMQContext, ZMQControlURL.c_str(), ZMQTelemetryURL.c_str(), true, getpid(), -1, false,
442  sourceDir.c_str(), dylibHandle, NULL, false);
443  }
444  catch (VuoException &e)
445  {
446  VUserLog("error: %s", e.what());
447  dlclose(dylibHandle);
448  dylibHandle = NULL;
449  throw;
450  }
451  }
452  else
453  {
454  // Start the composition or composition loader in a new process.
455 
456  vector<string> args;
457 
458  string executableName;
459  if (isUsingCompositionLoader())
460  {
461  // If we're using the loader, set the executable's display name to the dylib,
462  // so that composition's name shows up in the process list.
463  string dir, file, ext;
464  VuoFileUtilities::splitPath(dylibPath, dir, file, ext);
465  executableName = file;
466  }
467  else
468  {
469  string dir, file, ext;
470  VuoFileUtilities::splitPath(executablePath, dir, file, ext);
471  string executableName = file;
472  if (! ext.empty())
473  executableName += "." + ext;
474  }
475  args.push_back(executableName);
476 
477  // https://b33p.net/kosada/node/16374
478  // The socket's full pathname (`sockaddr_un::sun_path`) must be 104 characters or less
479  // (https://opensource.apple.com/source/xnu/xnu-2782.1.97/bsd/sys/un.h.auto.html).
480  // "/Users/me/Library/Containers/com.apple.ScreenSaver.Engine.legacyScreenSaver/Data/vuo-telemetry-rr8Br3"
481  // is 101 characters, which limits the username to 5 characters.
482  // "/Users/me/Library/Containers/com.apple.ScreenSaver.Engine.legacyScreenSaver/Data/v-rr8Br3"
483  // is 89 characters, which limits the username to 17 characters
484  // (still not a lot, but more likely to work with typical macOS usernames).
485  ZMQControlURL = "ipc://" + VuoFileUtilities::makeTmpFile("v", "");
486  ZMQTelemetryURL = "ipc://" + VuoFileUtilities::makeTmpFile("v", "");
487  args.push_back("--vuo-control=" + ZMQControlURL);
488  args.push_back("--vuo-telemetry=" + ZMQTelemetryURL);
489 
490  {
491  ostringstream oss;
492  oss << getpid();
493  args.push_back("--vuo-runner-pid=" + oss.str());
494  }
495 
496  {
497  ostringstream oss;
499  args.push_back("--vuo-runner-pipe=" + oss.str());
500  }
501 
502  if (shouldContinueIfRunnerDies)
503  args.push_back("--vuo-continue-if-runner-dies");
504 
505  if (isUsingCompositionLoader())
506  {
507  ZMQLoaderControlURL = "ipc://" + VuoFileUtilities::makeTmpFile("v", "");
508  args.push_back("--vuo-loader=" + ZMQLoaderControlURL);
509  }
510  else
511  args.push_back("--vuo-pause");
512 
513  int fd[2];
514  int ret = pipe(fd);
515  if (ret)
516  throw VuoException("The composition couldn't start because a pipe couldn't be opened : " + string(strerror(errno)));
517 
518  char * argv[7];
519  int argSize = args.size();
520  for (size_t i = 0; i < argSize; ++i)
521  {
522  size_t mallocSize = args[i].length() + 1;
523  argv[i] = (char *)malloc(mallocSize);
524  strlcpy(argv[i], args[i].c_str(), mallocSize);
525  }
526  argv[argSize] = NULL;
527 
528  string errorWorkingDirectory = "The composition couldn't start because the working directory couldn't be changed to '" + sourceDir + "' : ";
529  string errorExecutable = "The composition couldn't start because the file '" + executablePath + "' couldn't be executed : ";
530  string errorFork = "The composition couldn't start because the composition process couldn't be forked : ";
531  const size_t ERROR_BUFFER_LEN = 256;
532  char errorBuffer[ERROR_BUFFER_LEN];
533 
534  pipe(runnerReadCompositionWritePipe);
535 
536  pid_t childPid = fork();
537  if (childPid == 0)
538  {
539  // There are only a limited set of functions you're allowed to call in the child process
540  // after fork() and before exec(). Functions such as VUserLog() and exit() aren't allowed,
541  // so instead we're calling alternatives such as write() and _exit().
542 
543  close(runnerReadCompositionWritePipe[0]);
544 
545  pid_t grandchildPid = fork();
546  if (grandchildPid == 0)
547  {
548  close(fd[0]);
549  close(fd[1]);
550 
551  // Set the current working directory to that of the source .vuo composition so that
552  // relative URL paths are resolved correctly.
553  if (!sourceDir.empty())
554  {
555  ret = chdir(sourceDir.c_str());
556  if (ret)
557  {
558  strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
559  write(STDERR_FILENO, errorWorkingDirectory.c_str(), errorWorkingDirectory.length());
560  write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
561  write(STDERR_FILENO, "\n", 1);
562  _exit(-1);
563  }
564  }
565 
566  if (isRuntimeCheckingEnabled)
567  setenv("DYLD_INSERT_LIBRARIES", mainThreadChecker, 1);
568 
569  ret = execv(executablePath.c_str(), argv);
570  if (ret)
571  {
572  strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
573  write(STDERR_FILENO, errorExecutable.c_str(), errorExecutable.length());
574  write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
575  write(STDERR_FILENO, "\n", 1);
576  for (size_t i = 0; i < argSize; ++i)
577  free(argv[i]);
578  _exit(-1);
579  }
580  }
581  else if (grandchildPid > 0)
582  {
583  close(fd[0]);
584 
585  write(fd[1], &grandchildPid, sizeof(pid_t));
586  close(fd[1]);
587 
588  _exit(0);
589  }
590  else
591  {
592  close(fd[0]);
593  close(fd[1]);
594 
595  strerror_r(errno, errorBuffer, ERROR_BUFFER_LEN);
596  write(STDERR_FILENO, errorFork.c_str(), errorFork.length());
597  write(STDERR_FILENO, errorBuffer, strlen(errorBuffer));
598  write(STDERR_FILENO, "\n", 1);
599  _exit(-1);
600  }
601  }
602  else if (childPid > 0)
603  {
604  close(fd[1]);
605 
606  // If this process launches compositions in addition to this one,
607  // ensure they don't prop open this pipe,
608  // which would prevent VuoRunner::stop's `read()` from terminating.
609  VuoRunner_closeOnExec(runnerReadCompositionWritePipe[1]);
610 
611  for (size_t i = 0; i < argSize; ++i)
612  free(argv[i]);
613 
614  pid_t grandchildPid;
615  read(fd[0], &grandchildPid, sizeof(pid_t));
616  close(fd[0]);
617 
618  // Reap the child process.
619  int status;
620  int ret;
621  do {
622  ret = waitpid(childPid, &status, 0);
623  } while (ret == -1 && errno == EINTR);
624  if (WIFEXITED(status) && WEXITSTATUS(status))
625  throw VuoException("The composition couldn't start because the parent of the composition process exited with an error.");
626  else if (WIFSIGNALED(status))
627  throw VuoException("The composition couldn't start because the parent of the composition process exited abnormally : " + string(strsignal(WTERMSIG(status))));
628 
629  if (grandchildPid > 0)
630  compositionPid = grandchildPid;
631  else
632  throw VuoException("The composition couldn't start because the composition process id couldn't be obtained");
633  }
634  else
635  {
636  for (size_t i = 0; i < argSize; ++i)
637  free(argv[i]);
638 
639  throw VuoException("The composition couldn't start because the parent of the composition process couldn't be forked : " + string(strerror(errno)));
640  }
641  }
642 
643  // Connect to the composition loader (if any) and composition.
644  if (isUsingCompositionLoader())
645  {
646  ZMQLoaderControl = zmq_socket(ZMQContext,ZMQ_REQ);
648 
649  // Try to connect to the composition loader. If at first we don't succeed, give the composition loader a little more time to set up the socket.
650  int numTries = 0;
651  while (zmq_connect(ZMQLoaderControl,ZMQLoaderControlURL.c_str()))
652  {
653  if (++numTries == 1000)
654  throw VuoException("The composition couldn't start because the runner couldn't establish communication with the composition loader : " + string(strerror(errno)));
655  usleep(USEC_PER_SEC / 1000);
656  }
657 
658  replaceComposition(dylibPath, "");
659  }
660  else
661  {
662  __block string errorMessage;
663  dispatch_sync(controlQueue, ^{
664  try {
665  setUpConnections();
666  } catch (VuoException &e) {
667  errorMessage = e.what();
668  }
669  });
670  if (! errorMessage.empty())
671  throw VuoException(errorMessage);
672  }
673 }
674 
679 void *VuoRunner_listen(void *context)
680 {
681  pthread_detach(pthread_self());
682  VuoRunner *runner = static_cast<VuoRunner *>(context);
683  runner->listen();
684  return NULL;
685 }
686 
692 void VuoRunner::setUpConnections(void)
693 {
694  ZMQControl = zmq_socket(ZMQContext,ZMQ_REQ);
696 
697  // Try to connect to the composition. If at first we don't succeed, give the composition a little more time to set up the socket.
698  int numTries = 0;
699  while (zmq_connect(ZMQControl,ZMQControlURL.c_str()))
700  {
701  if (++numTries == 1000)
702  throw VuoException("The composition couldn't start because the runner couldn't establish communication to control the composition : " + string(strerror(errno)));
703  usleep(USEC_PER_SEC / 1000);
704  }
705 
706  // Cache published ports so they're available whenever a caller starts listening for published port value changes.
707  arePublishedInputPortsCached = false;
708  arePublishedOutputPortsCached = false;
709  if (isInCurrentProcess())
710  {
711  dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
712  __block string publishedPortsError;
713  dispatch_async(queue, ^{
714  try {
715  getCachedPublishedPorts(false);
716  getCachedPublishedPorts(true);
717  } catch (VuoException &e) {
718  publishedPortsError = e.what();
719  }
720  });
721  while (! (arePublishedInputPortsCached && arePublishedOutputPortsCached) )
722  {
724  usleep(USEC_PER_SEC / 1000);
725 
726  if (! publishedPortsError.empty())
727  throw VuoException(publishedPortsError);
728  }
729  }
730  else
731  {
732  getCachedPublishedPorts(false);
733  getCachedPublishedPorts(true);
734  }
735 
736  listenCanceled = false;
737  dispatch_semaphore_wait(endedListeningSemaphore, DISPATCH_TIME_FOREVER);
738 
739  pthread_t listenThread;
740  int ret = pthread_create(&listenThread, nullptr, &VuoRunner_listen, this);
741  if (ret)
742  throw VuoException(string("The composition couldn't start because the runner couldn't create a thread: ") + strerror(ret));
743 
744  dispatch_semaphore_wait(beganListeningSemaphore, DISPATCH_TIME_FOREVER);
745  if (!listenError.empty())
746  throw VuoException("The composition couldn't start because the runner couldn't establish communication to listen to the composition: " + listenError);
747 
750 }
751 
767 {
768  if (! isInCurrentProcess())
769  throw VuoException("The composition is not running in the current process. Only use this function if the composition was constructed with newCurrentProcessRunnerFromDynamicLibrary().");
770 
771  if (! isMainThread())
772  throw VuoException("This is not the main thread. Only call this function from the main thread.");
773 
774  while (! stopped)
775  VuoEventLoop_processEvent(VuoEventLoop_WaitIndefinitely);
776 }
777 
803 {
804  if (! isInCurrentProcess())
805  throw VuoException("The composition is not running in the current process. Only use this function if the composition was constructed with newCurrentProcessRunnerFromDynamicLibrary().");
806 
807  if (! isMainThread())
808  throw VuoException("This is not the main thread. Only call this function from the main thread.");
809 
810  VuoEventLoop_processEvent(VuoEventLoop_RunOnce);
811 }
812 
821 {
822  dispatch_sync(controlQueue, ^{
823  if (stopped || lostContact) {
824  return;
825  }
826 
828 
829  try
830  {
833  }
834  catch (VuoException &e)
835  {
836  stopBecauseLostContact(e.what());
837  }
838 
839  paused = true;
840  });
841 }
842 
849 {
850  dispatch_sync(controlQueue, ^{
851  if (stopped || lostContact) {
852  return;
853  }
854 
856 
857  try
858  {
861  }
862  catch (VuoException &e)
863  {
864  stopBecauseLostContact(e.what());
865  }
866 
867  paused = false;
868  });
869 }
870 
890 void VuoRunner::replaceComposition(string compositionDylibPath, string compositionDiff)
891 {
892  if (! isUsingCompositionLoader())
893  throw VuoException("The runner is not using a composition loader. Only use this function if the composition was constructed with newSeparateProcessRunnerFromDynamicLibrary().");
894 
895  dispatch_sync(controlQueue, ^{
896  if (stopped || lostContact) {
897  return;
898  }
899 
900  VDebugLog("Loading composition…");
901 
902  if (dylibPath != compositionDylibPath)
903  {
904  if (shouldDeleteBinariesWhenFinished)
905  {
906  remove(dylibPath.c_str());
907  }
908 
909  dylibPath = compositionDylibPath;
910  }
911 
913 
914  try
915  {
916  if (! paused)
917  {
918  VDebugLog(" Pausing…");
921  }
922 
923  cleanUpConnections();
924 
925  vector<string> dependencyDylibPathsRemoved = dependencyLibraries->dequeueLibrariesToUnload();
926  vector<string> dependencyDylibPathsAdded = dependencyLibraries->dequeueLibrariesToLoad();
927 
928  unsigned int messageCount = 4 + dependencyDylibPathsAdded.size() + dependencyDylibPathsRemoved.size();
929  zmq_msg_t *messages = (zmq_msg_t *)malloc(messageCount * sizeof(zmq_msg_t));
930  int index = 0;
931 
932  vuoInitMessageWithString(&messages[index++], dylibPath.c_str());
933 
934  vuoInitMessageWithInt(&messages[index++], dependencyDylibPathsAdded.size());
935  for (vector<string>::iterator i = dependencyDylibPathsAdded.begin(); i != dependencyDylibPathsAdded.end(); ++i) {
936  vuoInitMessageWithString(&messages[index++], (*i).c_str());
937  }
938 
939  vuoInitMessageWithInt(&messages[index++], dependencyDylibPathsRemoved.size());
940  for (vector<string>::iterator i = dependencyDylibPathsRemoved.begin(); i != dependencyDylibPathsRemoved.end(); ++i) {
941  vuoInitMessageWithString(&messages[index++], (*i).c_str());
942  }
943 
944  vuoInitMessageWithString(&messages[index], compositionDiff.c_str());
945 
946  if (! paused)
947  VDebugLog(" Replacing composition…");
948 
949  vuoLoaderControlRequestSend(VuoLoaderControlRequestCompositionReplace,messages,messageCount);
950  vuoLoaderControlReplyReceive(VuoLoaderControlReplyCompositionReplaced);
951 
952  setUpConnections();
953 
954  if (! paused)
955  {
956  VDebugLog(" Unpausing…");
959  }
960 
961  VDebugLog(" Done.");
962  }
963  catch (VuoException &e)
964  {
965  stopBecauseLostContact(e.what());
966  }
967  });
968 }
969 
983 void VuoRunner::stop(void)
984 {
985  dispatch_sync(controlQueue, ^{
986  if (stopped) {
987  return;
988  }
989 
991 
992  // Only tell the composition to stop if it hasn't already ended on its own.
993  if (! lostContact)
994  {
995  try
996  {
997  int timeoutInSeconds = (isInCurrentProcess() ? -1 : 5);
998  zmq_msg_t messages[3];
999  vuoInitMessageWithInt(&messages[0], timeoutInSeconds);
1000  vuoInitMessageWithBool(&messages[1], false); // isBeingReplaced
1001  vuoInitMessageWithBool(&messages[2], !isInCurrentProcess()); // isLastEverInProcess
1003 
1004  if (isInCurrentProcess() && isMainThread())
1005  {
1006  // If VuoRunner::stop() is blocking the main thread, wait for the composition's reply on another thread, and drain the main queue, in case the composition needs to shut down stuff that requires the main queue.
1007  __block bool replyReceived = false;
1008  dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
1009  vuoMemoryBarrier();
1010  try
1011  {
1013  }
1014  catch (...)
1015  {
1016  // do nothing; doesn't matter if connection timed out
1017  }
1018  replyReceived = true;
1019  });
1020  while (!replyReceived)
1021  {
1023  usleep(USEC_PER_SEC / 1000);
1024  }
1025  vuoMemoryBarrier();
1026  }
1027  else
1029  }
1030  catch (...)
1031  {
1032  // do nothing; doesn't matter if connection timed out
1033  }
1034  }
1035 
1036  cleanUpConnections();
1037 
1038  if (isUsingCompositionLoader() && ZMQLoaderControl)
1039  {
1040  zmq_close(ZMQLoaderControl);
1041  ZMQLoaderControl = NULL;
1042  }
1043 
1044  if (isInCurrentProcess() && dylibHandle)
1045  {
1046  VuoInitInProcessType *vuoInitInProcess = (VuoInitInProcessType *)dlsym(dylibHandle, "vuoInitInProcess");
1047  if (vuoInitInProcess) // Avoid double jeopardy if startInternal() already failed for missing vuoInitInProcess.
1048  {
1049  VuoFiniType *vuoFini = (VuoFiniType *)dlsym(dylibHandle, "vuoFini");
1050  if (! vuoFini)
1051  {
1052  VUserLog("The composition couldn't stop because vuoFini() couldn't be found in '%s' : %s", dylibPath.c_str(), dlerror());
1053  return;
1054  }
1055  void *runtimeState = vuoFini();
1056 
1058  if (! vuoFiniRuntimeState)
1059  {
1060  VUserLog("The composition couldn't stop because vuoFiniRuntimeState() couldn't be found in '%s' : %s", dylibPath.c_str(), dlerror());
1061  return;
1062  }
1064  }
1065 
1066  dlclose(dylibHandle);
1067  dylibHandle = NULL;
1068  }
1069  else if (isInCurrentProcess() && !dylibHandle)
1070  {
1071  // If the dylib isn't open, the composition isn't running, so there's nothing to clean up.
1072  }
1073  else
1074  {
1075  char buf[1];
1076  close(runnerReadCompositionWritePipe[1]);
1077 
1078  if (! lostContact)
1079  {
1080  // Wait for child process to end.
1081  // Can't use waitpid() since it only waits on child processes, yet compositionPid is a grandchild.
1082  // Instead, do a blocking read() — the grandchild never writes anything to the pipe, and when the grandchild exits,
1083  // read() will return EOF (since it was the last process that had it open for writing).
1084  read(runnerReadCompositionWritePipe[0], &buf, 1);
1085  }
1086 
1087  close(runnerReadCompositionWritePipe[0]);
1088 
1089  if (! lostContact)
1090  {
1091  zmq_term(ZMQContext);
1092  ZMQContext = NULL;
1093  }
1094  else
1095  {
1096  dispatch_semaphore_wait(terminatedZMQContextSemaphore, DISPATCH_TIME_FOREVER);
1097  }
1098  }
1099 
1100  if (shouldDeleteBinariesWhenFinished)
1101  {
1102  if (isUsingCompositionLoader())
1103  {
1104  remove(dylibPath.c_str());
1105  }
1106  else if (isInCurrentProcess())
1107  {
1108  remove(dylibPath.c_str());
1109  }
1110  else
1111  {
1112  remove(executablePath.c_str());
1113  }
1114  }
1115 
1116  delete dependencyLibraries;
1117  dependencyLibraries = NULL;
1118 
1119  stopped = true;
1120  dispatch_semaphore_signal(stoppedSemaphore);
1122  });
1123 }
1124 
1128 void VuoRunner::cleanUpConnections(void)
1129 {
1130  if (! ZMQControl)
1131  return;
1132 
1133  zmq_close(ZMQControl);
1134  ZMQControl = NULL;
1135 
1136  if (ZMQSelfSend)
1137  // Break out of zmq_poll().
1138  vuoSend("VuoRunner::ZMQSelfSend", ZMQSelfSend, 0, nullptr, 0, false, nullptr);
1139 
1140  dispatch_semaphore_wait(endedListeningSemaphore, DISPATCH_TIME_FOREVER);
1141  dispatch_semaphore_signal(endedListeningSemaphore);
1142 }
1143 
1150 {
1151  dispatch_retain(stoppedSemaphore);
1152  dispatch_semaphore_wait(stoppedSemaphore, DISPATCH_TIME_FOREVER);
1153  dispatch_semaphore_signal(stoppedSemaphore);
1154  dispatch_release(stoppedSemaphore);
1155 }
1156 
1172 void VuoRunner::setInputPortValue(string compositionIdentifier, string portIdentifier, json_object *value)
1173 {
1174  const char *valueAsString = json_object_to_json_string_ext(value, JSON_C_TO_STRING_PLAIN);
1175 
1176  dispatch_sync(controlQueue, ^{
1177  if (stopped || lostContact) {
1178  return;
1179  }
1180 
1181  vuoMemoryBarrier();
1182 
1183  try
1184  {
1185  zmq_msg_t messages[3];
1186  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1187  vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1188  vuoInitMessageWithString(&messages[2], valueAsString);
1191  }
1192  catch (VuoException &e)
1193  {
1194  stopBecauseLostContact(e.what());
1195  }
1196  });
1197 }
1198 
1212 void VuoRunner::fireTriggerPortEvent(string compositionIdentifier, string portIdentifier)
1213 {
1214  dispatch_sync(controlQueue, ^{
1215  if (stopped || lostContact) {
1216  return;
1217  }
1218 
1219  vuoMemoryBarrier();
1220 
1221  try
1222  {
1223  zmq_msg_t messages[2];
1224  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1225  vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1228  }
1229  catch (VuoException &e)
1230  {
1231  stopBecauseLostContact(e.what());
1232  }
1233  });
1234 }
1235 
1249 json_object * VuoRunner::getInputPortValue(string compositionIdentifier, string portIdentifier)
1250 {
1251  __block string valueAsString;
1252  dispatch_sync(controlQueue, ^{
1253  if (stopped || lostContact) {
1254  return;
1255  }
1256 
1257  vuoMemoryBarrier();
1258 
1259  try
1260  {
1261  zmq_msg_t messages[3];
1262  vuoInitMessageWithBool(&messages[0], !isInCurrentProcess());
1263  vuoInitMessageWithString(&messages[1], compositionIdentifier.c_str());
1264  vuoInitMessageWithString(&messages[2], portIdentifier.c_str());
1267  valueAsString = receiveString("null");
1268  }
1269  catch (VuoException &e)
1270  {
1271  stopBecauseLostContact(e.what());
1272  }
1273  });
1274  return json_tokener_parse(valueAsString.c_str());
1275 }
1276 
1290 json_object * VuoRunner::getOutputPortValue(string compositionIdentifier, string portIdentifier)
1291 {
1292  __block string valueAsString;
1293  dispatch_sync(controlQueue, ^{
1294  if (stopped || lostContact) {
1295  return;
1296  }
1297 
1298  vuoMemoryBarrier();
1299 
1300  try
1301  {
1302  zmq_msg_t messages[3];
1303  vuoInitMessageWithBool(&messages[0], !isInCurrentProcess());
1304  vuoInitMessageWithString(&messages[1], compositionIdentifier.c_str());
1305  vuoInitMessageWithString(&messages[2], portIdentifier.c_str());
1308  valueAsString = receiveString("null");
1309  }
1310  catch (VuoException &e)
1311  {
1312  stopBecauseLostContact(e.what());
1313  }
1314  });
1315  return json_tokener_parse(valueAsString.c_str());
1316 }
1317 
1331 string VuoRunner::getInputPortSummary(string compositionIdentifier, string portIdentifier)
1332 {
1333  __block string summary;
1334  dispatch_sync(controlQueue, ^{
1335  if (stopped || lostContact) {
1336  return;
1337  }
1338 
1339  vuoMemoryBarrier();
1340 
1341  try
1342  {
1343  zmq_msg_t messages[2];
1344  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1345  vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1348  summary = receiveString("");
1349  }
1350  catch (VuoException &e)
1351  {
1352  stopBecauseLostContact(e.what());
1353  }
1354  });
1355  return summary;
1356 }
1357 
1371 string VuoRunner::getOutputPortSummary(string compositionIdentifier, string portIdentifier)
1372 {
1373  __block string summary;
1374  dispatch_sync(controlQueue, ^{
1375  if (stopped || lostContact) {
1376  return;
1377  }
1378 
1379  vuoMemoryBarrier();
1380 
1381  try
1382  {
1383  zmq_msg_t messages[2];
1384  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1385  vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1388  summary = receiveString("");
1389  }
1390  catch (VuoException &e)
1391  {
1392  stopBecauseLostContact(e.what());
1393  }
1394  });
1395  return summary;
1396 }
1397 
1411 string VuoRunner::subscribeToInputPortTelemetry(string compositionIdentifier, string portIdentifier)
1412 {
1413  __block string summary;
1414  dispatch_sync(controlQueue, ^{
1415  if (stopped || lostContact) {
1416  return;
1417  }
1418 
1419  vuoMemoryBarrier();
1420 
1421  try
1422  {
1423  zmq_msg_t messages[2];
1424  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1425  vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1428  summary = receiveString("");
1429  }
1430  catch (VuoException &e)
1431  {
1432  stopBecauseLostContact(e.what());
1433  }
1434  });
1435  return summary;
1436 }
1437 
1451 string VuoRunner::subscribeToOutputPortTelemetry(string compositionIdentifier, string portIdentifier)
1452 {
1453  __block string summary;
1454  dispatch_sync(controlQueue, ^{
1455  if (stopped || lostContact) {
1456  return;
1457  }
1458 
1459  vuoMemoryBarrier();
1460 
1461  try
1462  {
1463  zmq_msg_t messages[2];
1464  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1465  vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1468  summary = receiveString("");
1469  }
1470  catch (VuoException &e)
1471  {
1472  stopBecauseLostContact(e.what());
1473  }
1474  });
1475  return summary;
1476 }
1477 
1488 void VuoRunner::unsubscribeFromInputPortTelemetry(string compositionIdentifier, string portIdentifier)
1489 {
1490  dispatch_sync(controlQueue, ^{
1491  if (stopped || lostContact) {
1492  return;
1493  }
1494 
1495  vuoMemoryBarrier();
1496 
1497  try
1498  {
1499  zmq_msg_t messages[2];
1500  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1501  vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1504  }
1505  catch (VuoException &e)
1506  {
1507  stopBecauseLostContact(e.what());
1508  }
1509  });
1510 }
1511 
1522 void VuoRunner::unsubscribeFromOutputPortTelemetry(string compositionIdentifier, string portIdentifier)
1523 {
1524  dispatch_sync(controlQueue, ^{
1525  if (stopped || lostContact) {
1526  return;
1527  }
1528 
1529  vuoMemoryBarrier();
1530 
1531  try
1532  {
1533  zmq_msg_t messages[2];
1534  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1535  vuoInitMessageWithString(&messages[1], portIdentifier.c_str());
1538  }
1539  catch (VuoException &e)
1540  {
1541  stopBecauseLostContact(e.what());
1542  }
1543  });
1544 }
1545 
1556 void VuoRunner::subscribeToEventTelemetry(string compositionIdentifier)
1557 {
1558  dispatch_sync(controlQueue, ^{
1559  if (stopped || lostContact) {
1560  return;
1561  }
1562 
1563  vuoMemoryBarrier();
1564 
1565  try
1566  {
1567  zmq_msg_t messages[1];
1568  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1571  }
1572  catch (VuoException &e)
1573  {
1574  stopBecauseLostContact(e.what());
1575  }
1576  });
1577 }
1578 
1591 void VuoRunner::unsubscribeFromEventTelemetry(string compositionIdentifier)
1592 {
1593  dispatch_sync(controlQueue, ^{
1594  if (stopped || lostContact) {
1595  return;
1596  }
1597 
1598  vuoMemoryBarrier();
1599 
1600  try
1601  {
1602  zmq_msg_t messages[1];
1603  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1606  }
1607  catch (VuoException &e)
1608  {
1609  stopBecauseLostContact(e.what());
1610  }
1611  });
1612 }
1613 
1624 void VuoRunner::subscribeToAllTelemetry(string compositionIdentifier)
1625 {
1626  dispatch_sync(controlQueue, ^{
1627  if (stopped || lostContact) {
1628  return;
1629  }
1630 
1631  vuoMemoryBarrier();
1632 
1633  try
1634  {
1635  zmq_msg_t messages[1];
1636  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1639  }
1640  catch (VuoException &e)
1641  {
1642  stopBecauseLostContact(e.what());
1643  }
1644  });
1645 }
1646 
1659 void VuoRunner::unsubscribeFromAllTelemetry(string compositionIdentifier)
1660 {
1661  dispatch_sync(controlQueue, ^{
1662  if (stopped || lostContact) {
1663  return;
1664  }
1665 
1666  vuoMemoryBarrier();
1667 
1668  try
1669  {
1670  zmq_msg_t messages[1];
1671  vuoInitMessageWithString(&messages[0], compositionIdentifier.c_str());
1674  }
1675  catch (VuoException &e)
1676  {
1677  stopBecauseLostContact(e.what());
1678  }
1679  });
1680 }
1681 
1695 void VuoRunner::setPublishedInputPortValues(map<Port *, json_object *> portsAndValuesToSet)
1696 {
1697  dispatch_sync(controlQueue, ^{
1698  if (stopped || lostContact) {
1699  return;
1700  }
1701 
1702  vuoMemoryBarrier();
1703 
1704  try
1705  {
1706  int messageCount = portsAndValuesToSet.size() * 2;
1707  zmq_msg_t messages[messageCount];
1708 
1709  int i = 0;
1710  for (auto &kv : portsAndValuesToSet)
1711  {
1712  vuoInitMessageWithString(&messages[i++], kv.first->getName().c_str());
1713  vuoInitMessageWithString(&messages[i++], json_object_to_json_string_ext(kv.second, JSON_C_TO_STRING_PLAIN));
1714  }
1715 
1718  }
1719  catch (VuoException &e)
1720  {
1721  stopBecauseLostContact(e.what());
1722  }
1723  });
1724 }
1725 
1734 {
1735  set<VuoRunner::Port *> portAsSet;
1736  portAsSet.insert(port);
1737  firePublishedInputPortEvent(portAsSet);
1738 }
1739 
1750 void VuoRunner::firePublishedInputPortEvent(const set<Port *> &ports)
1751 {
1752  dispatch_sync(controlQueue, ^{
1753  if (stopped || lostContact) {
1754  return;
1755  }
1756 
1757  vuoMemoryBarrier();
1758 
1759  lastFiredEventSignaled = false;
1760 
1761  try
1762  {
1763  size_t messageCount = ports.size() + 1;
1764  zmq_msg_t messages[messageCount];
1765 
1766  vuoInitMessageWithInt(&messages[0], ports.size());
1767  int i = 1;
1768  for (VuoRunner::Port *port : ports) {
1769  vuoInitMessageWithString(&messages[i++], port->getName().c_str());
1770  }
1771 
1774  }
1775  catch (VuoException &e)
1776  {
1777  stopBecauseLostContact(e.what());
1778  }
1779  });
1780 }
1781 
1807 {
1808  saturating_semaphore_wait(lastFiredEventSemaphore, &lastFiredEventSignaled);
1809 }
1810 
1822 {
1823  __block string valueAsString;
1824  dispatch_sync(controlQueue, ^{
1825  if (stopped || lostContact) {
1826  return;
1827  }
1828 
1829  vuoMemoryBarrier();
1830 
1831  try
1832  {
1833  zmq_msg_t messages[2];
1834  vuoInitMessageWithBool(&messages[0], !isInCurrentProcess());
1835  vuoInitMessageWithString(&messages[1], port->getName().c_str());
1838  valueAsString = receiveString("null");
1839  }
1840  catch (VuoException &e)
1841  {
1842  stopBecauseLostContact(e.what());
1843  }
1844  });
1845  return json_tokener_parse(valueAsString.c_str());
1846 }
1847 
1859 {
1860  __block string valueAsString;
1861  dispatch_sync(controlQueue, ^{
1862  if (stopped || lostContact) {
1863  return;
1864  }
1865 
1866  vuoMemoryBarrier();
1867 
1868  try
1869  {
1870  zmq_msg_t messages[2];
1871  vuoInitMessageWithBool(&messages[0], !isInCurrentProcess());
1872  vuoInitMessageWithString(&messages[1], port->getName().c_str());
1875  valueAsString = receiveString("null");
1876  }
1877  catch (VuoException &e)
1878  {
1879  stopBecauseLostContact(e.what());
1880  }
1881  });
1882  return json_tokener_parse(valueAsString.c_str());
1883 }
1884 
1899 vector<VuoRunner::Port *> VuoRunner::getCachedPublishedPorts(bool input)
1900 {
1901  // Caching not only provides faster access (without zmq messages),
1902  // but also ensures that the VuoRunner::Port pointers passed to
1903  // VuoRunnerDelegate::receivedTelemetryPublishedOutputPortUpdated are consistent.
1904 
1905  if (input)
1906  {
1907  if (! arePublishedInputPortsCached)
1908  {
1909  publishedInputPorts = refreshPublishedPorts(true);
1910  arePublishedInputPortsCached = true;
1911  }
1912  return publishedInputPorts;
1913  }
1914  else
1915  {
1916  if (! arePublishedOutputPortsCached)
1917  {
1918  publishedOutputPorts = refreshPublishedPorts(false);
1919  arePublishedOutputPortsCached = true;
1920  }
1921  return publishedOutputPorts;
1922  }
1923 }
1924 
1936 vector<VuoRunner::Port *> VuoRunner::refreshPublishedPorts(bool input)
1937 {
1938  dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
1939  dispatch_source_t timeout = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, DISPATCH_TIMER_STRICT, queue);
1940  dispatch_source_set_timer(timeout, dispatch_time(DISPATCH_TIME_NOW, 5*NSEC_PER_SEC), NSEC_PER_SEC, NSEC_PER_SEC/10);
1941  dispatch_source_set_event_handler(timeout, ^{
1942  stopBecauseLostContact("The connection between the composition and runner timed out when trying to receive the list of published ports");
1943  dispatch_source_cancel(timeout);
1944  });
1945  dispatch_resume(timeout);
1946 
1947  vector<VuoRunner::Port *> ports;
1948 
1949  try
1950  {
1951  vuoMemoryBarrier();
1952 
1953  enum VuoControlRequest requests[4];
1954  enum VuoControlReply replies[4];
1955  if (input)
1956  {
1963  }
1964  else
1965  {
1972  }
1973 
1974  vector<string> names;
1975  vector<string> types;
1976  vector<string> details;
1977 
1978  for (int i = 0; i < 3; ++i)
1979  {
1980  vuoControlRequestSend(requests[i], NULL, 0);
1981  vuoControlReplyReceive(replies[i]);
1982  vector<string> messageStrings = receiveListOfStrings();
1983  if (i == 0)
1984  names = messageStrings;
1985  else if (i == 1)
1986  types = messageStrings;
1987  else
1988  details = messageStrings;
1989  }
1990 
1991  for (size_t i = 0; i < names.size() && i < types.size() && i < details.size(); ++i)
1992  {
1993  VuoRunner::Port *port = new Port(names[i], types[i], json_tokener_parse(details[i].c_str()));
1994  ports.push_back(port);
1995  }
1996  }
1997  catch (...)
1998  {
1999  dispatch_source_cancel(timeout);
2000  dispatch_release(timeout);
2001  throw;
2002  }
2003 
2004  dispatch_source_cancel(timeout);
2005  dispatch_release(timeout);
2006 
2007  return ports;
2008 }
2009 
2019 vector<VuoRunner::Port *> VuoRunner::getPublishedInputPorts(void)
2020 {
2021  return getCachedPublishedPorts(true);
2022 }
2023 
2033 vector<VuoRunner::Port *> VuoRunner::getPublishedOutputPorts(void)
2034 {
2035  return getCachedPublishedPorts(false);
2036 }
2037 
2048 {
2049  vector<VuoRunner::Port *> inputPorts = getPublishedInputPorts();
2050  for (vector<VuoRunner::Port *>::iterator i = inputPorts.begin(); i != inputPorts.end(); ++i)
2051  if ((*i)->getName() == name)
2052  return *i;
2053 
2054  return NULL;
2055 }
2056 
2067 {
2068  vector<VuoRunner::Port *> outputPorts = getPublishedOutputPorts();
2069  for (vector<VuoRunner::Port *>::iterator i = outputPorts.begin(); i != outputPorts.end(); ++i)
2070  if ((*i)->getName() == name)
2071  return *i;
2072 
2073  return NULL;
2074 }
2075 
2087 void VuoRunner::listen()
2088 {
2089  // Name this thread.
2090  {
2091  const char *compositionName = dylibPath.empty() ? executablePath.c_str() : dylibPath.c_str();
2092 
2093  // Trim the path, if present.
2094  if (const char *lastSlash = strrchr(compositionName, '/'))
2095  compositionName = lastSlash + 1;
2096 
2097  char threadName[MAXTHREADNAMESIZE];
2098  snprintf(threadName, MAXTHREADNAMESIZE, "org.vuo.runner.telemetry: %s", compositionName);
2099  pthread_setname_np(threadName);
2100  }
2101 
2102  ZMQSelfReceive = zmq_socket(ZMQContext, ZMQ_PAIR);
2103  VuoRunner_configureSocket(ZMQSelfReceive, -1);
2104  if (zmq_bind(ZMQSelfReceive, "inproc://vuo-runner-self") != 0)
2105  {
2106  listenError = strerror(errno);
2107  dispatch_semaphore_signal(beganListeningSemaphore);
2108  return;
2109  }
2110 
2111  ZMQSelfSend = zmq_socket(ZMQContext, ZMQ_PAIR);
2112  VuoRunner_configureSocket(ZMQSelfSend, -1);
2113  if (zmq_connect(ZMQSelfSend, "inproc://vuo-runner-self") != 0)
2114  {
2115  listenError = strerror(errno);
2116  dispatch_semaphore_signal(beganListeningSemaphore);
2117  return;
2118  }
2119 
2120  {
2121  ZMQTelemetry = zmq_socket(ZMQContext,ZMQ_SUB);
2122  VuoRunner_configureSocket(ZMQTelemetry, -1);
2123  if(zmq_connect(ZMQTelemetry,ZMQTelemetryURL.c_str()))
2124  {
2125  listenError = strerror(errno);
2126  dispatch_semaphore_signal(beganListeningSemaphore);
2127  return;
2128  }
2129  }
2130 
2131  {
2132  // subscribe to all types of telemetry
2133  char type = VuoTelemetryStats;
2134  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2136  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2138  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2140  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2142  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2144  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2146  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2147  type = VuoTelemetryEventDropped;
2148  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2149  type = VuoTelemetryError;
2150  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2152  zmq_setsockopt(ZMQTelemetry, ZMQ_SUBSCRIBE, &type, sizeof type);
2153  }
2154 
2155  {
2156  // Wait until the connection is established, as evidenced by a heartbeat telemetry message
2157  // being received from the composition. This is necessary because the ØMQ API doesn't provide
2158  // any way to tell when a SUB socket is ready to receive messages, and if you call zmq_poll()
2159  // on it before it's ready, then it might miss messages that came in while it was still trying
2160  // to get ready. (The zmq_connect() function doesn't make any guarantees about the socket being ready.
2161  // It just starts some setup that may continue asynchronously after zmq_connect() has returned.)
2162  // To avoid missing important telemetry messages from the composition, we make sure that the
2163  // runner doesn't tell the composition to unpause until the runner has verified that it's
2164  // receiving heartbeat telemetry messages. http://zguide.zeromq.org/page:all#Node-Coordination
2165  zmq_pollitem_t items[]=
2166  {
2167  {ZMQTelemetry,0,ZMQ_POLLIN,0},
2168  };
2169  int itemCount = 1;
2170  long timeout = -1;
2171  zmq_poll(items,itemCount,timeout);
2172  }
2173 
2174  dispatch_semaphore_signal(beganListeningSemaphore);
2175 
2176  bool pendingCancel = false;
2177  while(! listenCanceled)
2178  {
2179  zmq_pollitem_t items[]=
2180  {
2181  {ZMQTelemetry,0,ZMQ_POLLIN,0},
2182  {ZMQSelfReceive,0,ZMQ_POLLIN,0},
2183  };
2184  int itemCount = 2;
2185 
2186  // Wait 1 second. If no telemetry was received in that second, we probably lost contact with the composition.
2187  long timeout = pendingCancel ? USEC_PER_SEC / 10 : USEC_PER_SEC;
2188  zmq_poll(items,itemCount,timeout);
2189  if(items[0].revents & ZMQ_POLLIN)
2190  {
2191  // Receive telemetry type.
2192  char type = vuoReceiveInt(ZMQTelemetry, NULL);
2193 
2194  // Receive telemetry arguments and forward to VuoRunnerDelegate.
2195  switch (type)
2196  {
2197  case VuoTelemetryStats:
2198  {
2199  unsigned long utime = vuoReceiveUnsignedInt64(ZMQTelemetry, NULL);
2200  unsigned long stime = vuoReceiveUnsignedInt64(ZMQTelemetry, NULL);
2201  dispatch_sync(delegateQueue, ^{
2202  if (delegate)
2203  delegate->receivedTelemetryStats(utime, stime);
2204  });
2205  break;
2206  }
2208  {
2209  char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2210  char *nodeIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2211  dispatch_sync(delegateQueue, ^{
2212  if (delegate)
2213  delegate->receivedTelemetryNodeExecutionStarted(compositionIdentifier, nodeIdentifier);
2214  });
2215  free(compositionIdentifier);
2216  free(nodeIdentifier);
2217  break;
2218  }
2220  {
2221  char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2222  char *nodeIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2223  dispatch_sync(delegateQueue, ^{
2224  if (delegate)
2225  delegate->receivedTelemetryNodeExecutionFinished(compositionIdentifier, nodeIdentifier);
2226  });
2227  free(compositionIdentifier);
2228  free(nodeIdentifier);
2229  break;
2230  }
2232  {
2233  while (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2234  {
2235  char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2236  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2237  {
2238  char *portIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2239  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2240  {
2241  bool receivedEvent = vuoReceiveBool(ZMQTelemetry, NULL);
2242  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2243  {
2244  bool receivedData = vuoReceiveBool(ZMQTelemetry, NULL);
2245  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2246  {
2247  string portDataSummary;
2248  char *s = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2249  if (s)
2250  {
2251  portDataSummary = s;
2252  free(s);
2253  }
2254  else
2255  portDataSummary = "";
2256 
2257  dispatch_sync(delegateQueue, ^{
2258  if (delegate)
2259  delegate->receivedTelemetryInputPortUpdated(compositionIdentifier, portIdentifier, receivedEvent, receivedData, portDataSummary);
2260  });
2261  }
2262  }
2263  }
2264  free(portIdentifier);
2265  }
2266  free(compositionIdentifier);
2267  }
2268  break;
2269  }
2271  {
2272  while (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2273  {
2274  char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2275  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2276  {
2277  char *portIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2278  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2279  {
2280  bool sentEvent = vuoReceiveBool(ZMQTelemetry, NULL);
2281  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2282  {
2283  bool sentData = vuoReceiveBool(ZMQTelemetry, NULL);
2284  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2285  {
2286  string portDataSummary;
2287  char *s = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2288  if (s)
2289  {
2290  portDataSummary = s;
2291  free(s);
2292  }
2293  else
2294  portDataSummary = "";
2295 
2296  dispatch_sync(delegateQueue, ^{
2297  if (delegate)
2298  delegate->receivedTelemetryOutputPortUpdated(compositionIdentifier, portIdentifier, sentEvent, sentData, portDataSummary);
2299  });
2300  }
2301  }
2302  }
2303  free(portIdentifier);
2304  }
2305  free(compositionIdentifier);
2306  }
2307  break;
2308  }
2310  {
2311  while (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2312  {
2313  char *portIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2314  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2315  {
2316  bool sentData = vuoReceiveBool(ZMQTelemetry, NULL);
2317  if (VuoTelemetry_hasMoreToReceive(ZMQTelemetry))
2318  {
2319  string portDataSummary;
2320  char *s = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2321  if (s)
2322  {
2323  portDataSummary = s;
2324  free(s);
2325  }
2326  else
2327  portDataSummary = "";
2328 
2329  Port *port = getPublishedOutputPortWithName(portIdentifier);
2330 
2331  dispatch_sync(delegateQueue, ^{
2332  if (delegate)
2333  delegate->receivedTelemetryPublishedOutputPortUpdated(port, sentData, portDataSummary);
2334  });
2335  }
2336  }
2337  free(portIdentifier);
2338  }
2339  break;
2340  }
2342  {
2343  saturating_semaphore_signal(lastFiredEventSemaphore, &lastFiredEventSignaled);
2344  break;
2345  }
2347  {
2348  char *compositionIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2349  char *portIdentifier = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2350  dispatch_sync(delegateQueue, ^{
2351  if (delegate)
2352  delegate->receivedTelemetryEventDropped(compositionIdentifier, portIdentifier);
2353  });
2354  free(compositionIdentifier);
2355  free(portIdentifier);
2356  break;
2357  }
2358  case VuoTelemetryError:
2359  {
2360  char *message = vuoReceiveAndCopyString(ZMQTelemetry, NULL);
2361  dispatch_sync(delegateQueue, ^{
2362  if (delegate)
2363  delegate->receivedTelemetryError( string(message) );
2364  });
2365  free(message);
2366  break;
2367  }
2369  {
2370  dispatch_sync(delegateQueue, ^{
2371  if (delegate)
2372  delegate->lostContactWithComposition();
2373  });
2374  dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2375  stop();
2376  });
2377  break;
2378  }
2379  }
2380  }
2381  else if (! listenCanceled) // Either the 1-second timeout elapsed, or we got a stop-listening message from ZMQSelfSend
2382  {
2383  if (items[1].revents & ZMQ_POLLIN)
2384  {
2385  // This is a stop-listening message.
2386  vuoReceiveInt(ZMQSelfReceive, NULL);
2387 
2388  // Drain any remaining telemetry messages.
2389  pendingCancel = true;
2390  }
2391 
2392  else if (pendingCancel)
2393  listenCanceled = true;
2394 
2395  else
2396  {
2397  // Timeout.
2398  listenCanceled = true;
2399  string dir, file, ext;
2400  VuoFileUtilities::splitPath(executablePath, dir, file, ext);
2401  stopBecauseLostContact("The connection between the composition ('" + file + "') and runner timed out while listening for telemetry.");
2402  }
2403  }
2404  }
2405 
2406  zmq_close(ZMQTelemetry);
2407  ZMQTelemetry = NULL;
2408 
2409  zmq_close(ZMQSelfSend);
2410  ZMQSelfSend = NULL;
2411  zmq_close(ZMQSelfReceive);
2412  ZMQSelfReceive = NULL;
2413 
2414  dispatch_semaphore_signal(endedListeningSemaphore);
2415  return;
2416 }
2417 
2427 void VuoRunner::vuoControlRequestSend(enum VuoControlRequest request, zmq_msg_t *messages, unsigned int messageCount)
2428 {
2429  char *error = NULL;
2430  vuoSend("runner VuoControl",ZMQControl,request,messages,messageCount,false,&error);
2431 
2432  if (error)
2433  {
2434  string e(error);
2435  free(error);
2436  throw VuoException(e);
2437  }
2438 }
2439 
2449 void VuoRunner::vuoLoaderControlRequestSend(enum VuoLoaderControlRequest request, zmq_msg_t *messages, unsigned int messageCount)
2450 {
2451  char *error = NULL;
2452  vuoSend("runner VuoLoaderControl",ZMQLoaderControl,request,messages,messageCount,false,&error);
2453 
2454  if (error)
2455  {
2456  string e(error);
2457  free(error);
2458  throw VuoException(e);
2459  }
2460 }
2461 
2470 void VuoRunner::vuoControlReplyReceive(enum VuoControlReply expectedReply)
2471 {
2472  char *error = NULL;
2473  int reply = vuoReceiveInt(ZMQControl, &error);
2474 
2475  if (error)
2476  {
2477  string e(error);
2478  free(error);
2479  ostringstream oss;
2480  oss << e << " (expected " << expectedReply << ")";
2481  throw VuoException(oss.str());
2482  }
2483  else if (reply != expectedReply)
2484  {
2485  ostringstream oss;
2486  oss << "The runner received the wrong message from the composition (expected " << expectedReply << ", received " << reply << ")";
2487  throw VuoException(oss.str());
2488  }
2489 }
2490 
2499 void VuoRunner::vuoLoaderControlReplyReceive(enum VuoLoaderControlReply expectedReply)
2500 {
2501  char *error = NULL;
2502  int reply = vuoReceiveInt(ZMQLoaderControl, &error);
2503 
2504  if (error)
2505  {
2506  string e(error);
2507  free(error);
2508  ostringstream oss;
2509  oss << e << " (expected " << expectedReply << ")";
2510  throw VuoException(oss.str());
2511  }
2512  else if (reply != expectedReply)
2513  {
2514  ostringstream oss;
2515  oss << "The runner received the wrong message from the composition loader (expected " << expectedReply << ", received " << reply << ")";
2516  throw VuoException(oss.str());
2517  }
2518 }
2519 
2525 string VuoRunner::receiveString(string fallbackIfNull)
2526 {
2527  char *error = NULL;
2528  char *s = vuoReceiveAndCopyString(ZMQControl, &error);
2529 
2530  if (error)
2531  {
2532  string e(error);
2533  free(error);
2534  throw VuoException(e);
2535  }
2536 
2537  string ret;
2538  if (s)
2539  {
2540  ret = s;
2541  free(s);
2542  }
2543  else
2544  ret = fallbackIfNull;
2545 
2546  return ret;
2547 }
2548 
2552 vector<string> VuoRunner::receiveListOfStrings(void)
2553 {
2554  vector<string> messageStrings;
2556  {
2557  string s = receiveString("");
2558  messageStrings.push_back(s);
2559  }
2560  return messageStrings;
2561 }
2562 
2568 void VuoRunner::saturating_semaphore_signal(dispatch_semaphore_t dsema, bool *signaled)
2569 {
2570  if (__sync_bool_compare_and_swap(signaled, false, true))
2571  dispatch_semaphore_signal(dsema);
2572 }
2573 
2579 void VuoRunner::saturating_semaphore_wait(dispatch_semaphore_t dsema, bool *signaled)
2580 {
2581  *signaled = false;
2582  dispatch_semaphore_wait(dsema, DISPATCH_TIME_FOREVER);
2583 }
2584 
2589 {
2590  return stopped;
2591 }
2592 
2596 bool VuoRunner::isInCurrentProcess(void)
2597 {
2598  return executablePath.empty();
2599 }
2600 
2605 bool VuoRunner::isUsingCompositionLoader(void)
2606 {
2607  return ! executablePath.empty() && ! dylibPath.empty();
2608 }
2609 
2614 {
2615  dispatch_sync(delegateQueue, ^{
2616  this->delegate = delegate;
2617  });
2618 }
2619 
2623 void VuoRunner::stopBecauseLostContact(string errorMessage)
2624 {
2625  __block bool alreadyLostContact;
2626  dispatch_sync(delegateQueue, ^{
2627  alreadyLostContact = lostContact;
2628  lostContact = true;
2629  });
2630 
2631  if (alreadyLostContact)
2632  return;
2633 
2634  saturating_semaphore_signal(lastFiredEventSemaphore, &lastFiredEventSignaled);
2635 
2636  dispatch_sync(delegateQueue, ^{
2637  if (delegate)
2638  delegate->lostContactWithComposition();
2639  });
2640 
2641  dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2642  stop();
2643  });
2644 
2645  if (! isInCurrentProcess())
2646  {
2647  // Normally, stop() is responsible for terminating the ZMQ context.
2648  // But, if stopBecauseLostContact() is called, it takes the responsibility away from stop().
2649  // If there's an in-progress zmq_recv() call, stop() will get stuck waiting on controlQueue, so
2650  // the below call to terminate the ZMQ context interrupts zmq_recv() and allows stop() to proceed.
2651  dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
2652  vuoMemoryBarrier();
2653 
2654  zmq_term(ZMQContext);
2655  ZMQContext = NULL;
2656  dispatch_semaphore_signal(terminatedZMQContextSemaphore);
2657  });
2658  }
2659 
2660  VUserLog("%s", errorMessage.c_str());
2661 }
2662 
2669 {
2670  return compositionPid;
2671 }
2672 
2680 VuoRunner::Port::Port(string name, string type, json_object *details)
2681 {
2682  this->name = name;
2683  this->type = type;
2684  this->details = details;
2685 }
2686 
2691 {
2692  return name;
2693 }
2694 
2699 {
2700  return type;
2701 }
2702 
2728 {
2729  return details;
2730 }
2731 
2732 VuoRunnerDelegate::~VuoRunnerDelegate() { } // Fixes "undefined symbols" error