Vuo  2.0.0
VuoOsc.cc
Go to the documentation of this file.
1 
10 #include "VuoOsc.h"
11 #include "VuoPool.hh"
12 
13 #include <map>
14 #include <set>
15 #include <vector>
16 #include <string>
17 #include <sys/socket.h>
18 #include <netinet/in.h>
19 #include <ifaddrs.h>
20 #include <oscpack/osc/OscOutboundPacketStream.h>
21 #include <oscpack/osc/OscPacketListener.h>
22 #include <oscpack/ip/UdpSocket.h>
23 #include <CoreServices/CoreServices.h>
24 
25 #pragma clang diagnostic push
26 #pragma clang diagnostic ignored "-Wdocumentation"
27 #include <json-c/json.h>
28 #pragma clang diagnostic pop
29 
30 extern "C"
31 {
32 #include "module.h"
33 
34 #ifdef VUO_COMPILER
36  "title" : "VuoOsc",
37  "dependencies" : [
38  "VuoOscMessage",
39  "VuoOscInputDevice",
40  "VuoOscOutputDevice",
41  "VuoList_VuoOscMessage",
42  "CoreServices.framework",
43  "oscpack"
44  ]
45  });
46 #endif
47 }
48 
49 
51 
57 class VuoOscInPacketListener : public osc::OscPacketListener
58 {
59 public:
61  {
62  triggerSemaphore = dispatch_semaphore_create(1);
63  }
64 
69  {
70  dispatch_semaphore_wait(triggerSemaphore, DISPATCH_TIME_FOREVER);
71  triggers.insert(receivedMessage);
72  dispatch_semaphore_signal(triggerSemaphore);
73  }
74 
79  {
80  dispatch_semaphore_wait(triggerSemaphore, DISPATCH_TIME_FOREVER);
81  triggers.erase(receivedMessage);
82  dispatch_semaphore_signal(triggerSemaphore);
83  }
84 
88  unsigned int triggerCount(void)
89  {
90  dispatch_semaphore_wait(triggerSemaphore, DISPATCH_TIME_FOREVER);
91  unsigned int size = triggers.size();
92  dispatch_semaphore_signal(triggerSemaphore);
93  return size;
94  }
95 
96 protected:
97  std::set<VuoOscReceivedMessageTrigger> triggers;
98  dispatch_semaphore_t triggerSemaphore;
99 
103  virtual void ProcessBundle(const osc::ReceivedBundle& b, const IpEndpointName& remoteEndpoint)
104  {
105 // VLog("%ld with %d elements", b.TimeTag(), b.ElementCount());
106 
107  for (osc::ReceivedBundle::const_iterator i = b.ElementsBegin(); i != b.ElementsEnd(); ++i)
108  {
109  if (i->IsBundle())
110  {
111 // VLog(" contains bundle");
112  ProcessBundle(osc::ReceivedBundle(*i), remoteEndpoint);
113  }
114  else
115  {
116 // VLog(" contains message");
117  ProcessMessage(osc::ReceivedMessage(*i), remoteEndpoint);
118  }
119  }
120  }
121 
125  virtual void ProcessMessage(const osc::ReceivedMessage &m, const IpEndpointName &remoteEndpoint)
126  {
127  if (!triggerCount())
128  return;
129 
130  try
131  {
132  // Convert message arguments into a JSON object.
135  unsigned int i = 0;
136  for (osc::ReceivedMessage::const_iterator arg = m.ArgumentsBegin(); arg != m.ArgumentsEnd(); ++arg, ++i)
137  {
138  switch (arg->TypeTag())
139  {
140  case osc::NIL_TYPE_TAG:
141  data[i] = NULL;
142  dataTypes[i] = VuoOscType_Auto;
143  break;
144  case osc::TRUE_TYPE_TAG:
145  data[i] = json_object_new_boolean(true);
146  dataTypes[i] = VuoOscType_Auto;
147  break;
148  case osc::FALSE_TYPE_TAG:
149  data[i] = json_object_new_boolean(false);
150  dataTypes[i] = VuoOscType_Auto;
151  break;
152  case osc::FLOAT_TYPE_TAG:
153  data[i] = json_object_new_double(arg->AsFloat());
154  dataTypes[i] = VuoOscType_Float32;
155  break;
156  case osc::DOUBLE_TYPE_TAG:
157  data[i] = json_object_new_double(arg->AsDouble());
158  dataTypes[i] = VuoOscType_Auto;
159  break;
160  case osc::INT32_TYPE_TAG:
161  data[i] = json_object_new_int(arg->AsInt32());
162  dataTypes[i] = VuoOscType_Int32;
163  break;
164  case osc::INT64_TYPE_TAG:
165  data[i] = json_object_new_int64(arg->AsInt64());
166  dataTypes[i] = VuoOscType_Auto;
167  break;
168  case osc::STRING_TYPE_TAG:
169  data[i] = json_object_new_string(arg->AsString());
170  dataTypes[i] = VuoOscType_Auto;
171  break;
172  default:
173  throw osc::Exception(((std::string)"unknown argument type tag '" + arg->TypeTag() + "'").c_str());
174  }
175  }
176 
177  VuoOscMessage vuoMessage = VuoOscMessage_make(VuoText_make(m.AddressPattern()), i, data, dataTypes);
178 
179  // Send it to all listening nodes.
180  dispatch_semaphore_wait(triggerSemaphore, DISPATCH_TIME_FOREVER);
181  for (std::set<VuoOscReceivedMessageTrigger>::iterator it = triggers.begin(); it != triggers.end(); ++it)
182  (*it)(vuoMessage);
183  dispatch_semaphore_signal(triggerSemaphore);
184  }
185  catch (osc::Exception &e)
186  {
187  VUserLog("Error parsing message: %s: %s", m.AddressPattern(), e.what());
188  }
189  }
190 };
191 
196 static void registerCallback(CFNetServiceRef theService, CFStreamError *error, void *info)
197 {
198 }
199 
203 static CFNetServiceRef VuoOsc_createNetService(char *name, int port, bool isServer)
204 {
205  bool defaultName = false;
206  if (VuoText_isEmpty(name))
207  {
208  name = VuoText_format("Vuo OSC %s", isServer ? "Server" : "Client");
209  defaultName = true;
210  }
211 
212  CFStringRef nameCF = CFStringCreateWithCString(NULL, name, kCFStringEncodingUTF8);
213  if (defaultName)
214  free(name);
215  if (!nameCF)
216  return NULL;
217 
218  CFNetServiceRef netService = CFNetServiceCreate(NULL, CFSTR(""), CFSTR("_osc._udp"), nameCF, port);
219  CFRelease(nameCF);
220 
221  // Add a TXT record, type=server, per http://opensoundcontrol.org/topic/110 proposal #2.
222  {
223  CFStringRef keys[] = { CFSTR("type") };
224  CFTypeRef values[] = { isServer ? CFSTR("server") : CFSTR("client") };
225  CFDictionaryRef attr = CFDictionaryCreate(NULL, (const void **)&keys, (const void **)&values, sizeof(keys) / sizeof(keys[0]), NULL, NULL);
226  CFDataRef txtRecord = CFNetServiceCreateTXTDataWithDictionary( NULL, attr );
227  CFRelease(attr);
228 
229  CFNetServiceSetTXTData(netService, txtRecord);
230 
231  CFRelease(txtRecord);
232  }
233 
234  // Start advertising the Bonjour service.
235  CFNetServiceClientContext clientContext = { 0, NULL, NULL, NULL, NULL };
236  CFNetServiceSetClient(netService, registerCallback, &clientContext);
237  CFNetServiceScheduleWithRunLoop(netService, CFRunLoopGetCurrent(), kCFRunLoopCommonModes);
238  CFStreamError error;
239  if (CFNetServiceRegisterWithOptions(netService, kCFNetServiceFlagNoAutoRename, &error) == false)
240  {
241  CFNetServiceUnscheduleFromRunLoop(netService, CFRunLoopGetCurrent(), kCFRunLoopCommonModes);
242  CFNetServiceSetClient(netService, NULL, NULL);
243  CFNetServiceCancel(netService);
244  CFRelease(netService);
245  VUserLog("Error: Could not advertise OSC %s via Bonjour (domain=%ld, error=%d)", isServer ? "server" : "client", error.domain, error.error);
246  return NULL;
247  }
248 
249  return netService;
250 }
251 
255 static void VuoOsc_destroyNetService(CFNetServiceRef netService)
256 {
257  CFNetServiceUnscheduleFromRunLoop(netService, CFRunLoopGetCurrent(), kCFRunLoopCommonModes);
258  CFNetServiceSetClient(netService, NULL, NULL);
259  CFNetServiceCancel(netService);
260  CFRelease(netService);
261 }
262 
267 class VuoOscInSocket : public UdpSocket
268 {
269  SocketReceiveMultiplexer mux_;
270  VuoOscInPacketListener *listener_;
271  CFNetServiceRef netService;
272  int uses = 0;
273  bool stopping = false;
274  dispatch_queue_t queue;
275 
276 public:
281  void listenForMessages(void)
282  {
283  while (!stopping)
284  {
285  try
286  {
287  mux_.Run();
288  }
289  catch (osc::MalformedPacketException e)
290  {
291  VUserLog("Malformed OSC packet: %s", e.what());
292  }
293  catch (osc::MalformedBundleException e)
294  {
295  VUserLog("Malformed OSC bundle: %s", e.what());
296  }
297  catch (osc::MalformedMessageException e)
298  {
299  VUserLog("Malformed OSC message: %s", e.what());
300  }
301  catch (osc::WrongArgumentTypeException e)
302  {
303  VUserLog("OSC: Wrong argument type: %s", e.what());
304  }
305  catch (osc::MissingArgumentException e)
306  {
307  VUserLog("OSC: Missing argument: %s", e.what());
308  }
309  catch (osc::ExcessArgumentException e)
310  {
311  VUserLog("OSC: Excess argument: %s", e.what());
312  }
313  catch (std::runtime_error &e)
314  {
315  VUserLog("OSC runtime error: %s", e.what());
316  }
317  catch (std::exception &e)
318  {
319  VUserLog("OSC exception: %s", e.what());
320  }
321  catch (...)
322  {
323  VUserLog("Unknown OSC exception");
324  }
325  }
326  }
327 
331  VuoOscInSocket(const IpEndpointName &localEndpoint, VuoOscInPacketListener *listener, char *name)
332  : UdpSocket(), listener_(listener)
333  {
334  netService = NULL;
335  SetAllowReuse(true);
336 
337  try
338  {
339  Bind(localEndpoint);
340  }
341  catch(std::exception const &e)
342  {
343  VUserLog("Error: (port %d) %s", localEndpoint.port, e.what());
344  return;
345  }
346 
347  mux_.AttachSocketListener(this, listener_);
348 
349  netService = VuoOsc_createNetService(name, Port(), true);
350 
351  queue = dispatch_queue_create("org.vuo.osc.receive", NULL);
352 
353  dispatch_async(queue, ^{
355  });
356  }
357 
361  void use(void)
362  {
363  ++uses;
364  }
365 
369  void disuse(void)
370  {
371  --uses;
372  }
373 
377  int useCount(void)
378  {
379  return uses;
380  }
381 
386  {
387  return listener_;
388  }
389 
390  ~VuoOscInSocket()
391  {
392  if (netService)
393  VuoOsc_destroyNetService(netService);
394 
395  if (IsBound())
396  {
397  stopping = true;
398  mux_.AsynchronousBreak();
399  mux_.DetachSocketListener(this, listener_);
400 
401  dispatch_sync(queue, ^{});
402  dispatch_release(queue);
403  }
404 
405  delete listener_;
406  }
407 };
408 
409 std::map<unsigned int, VuoOscInSocket *> VuoOscInPool;
410 dispatch_semaphore_t VuoOscInPool_semaphore = NULL;
411 
415 __attribute__((constructor)) static void VuoOscIn_init(void)
416 {
417  VuoOscInPool_semaphore = dispatch_semaphore_create(1);
418 }
419 
424 {
427 };
428 
429 void VuoOscIn_destroy(VuoOscIn oi);
430 
437 {
438  struct VuoOscIn_internal *oii;
439  oii = (struct VuoOscIn_internal *)calloc(1, sizeof(struct VuoOscIn_internal));
441 
442  oii->device = device;
444 
445  dispatch_semaphore_wait(VuoOscInPool_semaphore, DISPATCH_TIME_FOREVER);
446  {
447  if (VuoOscInPool.find(oii->device.port) == VuoOscInPool.end())
448  {
449 // VLog("No socket found for port %lld; creating one.", oii->device.port);
451  IpEndpointName endpoint(IpEndpointName::ANY_ADDRESS, oii->device.port==0 ? IpEndpointName::ANY_PORT : oii->device.port);
452  VuoOscInSocket *socket = new VuoOscInSocket(endpoint, listener, (char *)oii->device.name);
453  VuoOscInPool[oii->device.port] = socket;
454  }
455  VuoOscInPool[oii->device.port]->use();
456  }
457  dispatch_semaphore_signal(VuoOscInPool_semaphore);
458 
459  return (VuoOscIn)oii;
460 }
461 
468 (
469  VuoOscIn oi,
471 )
472 {
473  if (!oi)
474  return;
475 
476  struct VuoOscIn_internal *oii = (struct VuoOscIn_internal *)oi;
478 
479  dispatch_semaphore_wait(VuoOscInPool_semaphore, DISPATCH_TIME_FOREVER);
480  {
481 // VLog("Enabling trigger %p", receivedMessage);
482  VuoOscInPool[oii->device.port]->listener()->enableTrigger(receivedMessage);
483  }
484  dispatch_semaphore_signal(VuoOscInPool_semaphore);
485 }
486 
493 {
494  if (!oi)
495  return;
496 
497  struct VuoOscIn_internal *oii = (struct VuoOscIn_internal *)oi;
498 
499  dispatch_semaphore_wait(VuoOscInPool_semaphore, DISPATCH_TIME_FOREVER);
500  {
501  VuoOscInPool[oii->device.port]->listener()->disableTrigger(oii->receivedMessage);
502 // VLog("Disabled trigger %p", oii->receivedMessage);
503  }
504  dispatch_semaphore_signal(VuoOscInPool_semaphore);
505 
506  oii->receivedMessage = NULL;
507 }
508 
513 {
514  if (!oi)
515  return;
516 
517  struct VuoOscIn_internal *oii = (struct VuoOscIn_internal *)oi;
518 
519  dispatch_semaphore_wait(VuoOscInPool_semaphore, DISPATCH_TIME_FOREVER);
520  {
521  VuoOscInPool[oii->device.port]->disuse();
522  if (!VuoOscInPool[oii->device.port]->useCount())
523  {
524 // VLog("Deleting socket for port %lld.", oii->device.port);
525  std::map<unsigned int, VuoOscInSocket *>::iterator it = VuoOscInPool.find(oii->device.port);
526  delete it->second;
527  VuoOscInPool.erase(it);
528  }
529  }
530  dispatch_semaphore_signal(VuoOscInPool_semaphore);
531 
533 
534  free(oi);
535 }
536 
541 {
542 public:
544 
549  : device(_device)
550  {
551  }
552 
557  bool operator < (const VuoOscOutputIdentifier &other) const
558  {
560  }
561 };
562 
566 typedef struct _VuoOscOut_internal
567 {
569  std::vector<UdpTransmitSocket *> sockets;
570  std::vector<CFNetServiceRef> netServices;
571  dispatch_queue_t queue;
573 
578 {
579  int socketFD = socket(PF_INET, SOCK_DGRAM, 0);
580  if (socketFD == -1)
581  {
582  VUserLog("Couldn't open port: %s", strerror(errno));
583  return 0;
584  }
585 
586  struct sockaddr_in addr;
587  memset(&addr, 0, sizeof(addr));
588  addr.sin_family = PF_INET;
589  addr.sin_addr.s_addr = INADDR_ANY;
590 
591  if (bind(socketFD, (struct sockaddr *)&addr, sizeof(addr)) == -1)
592  {
593  VUserLog("Couldn't bind: %s", strerror(errno));
594  close(socketFD);
595  return 0;
596  }
597 
598  struct sockaddr_in sin;
599  socklen_t len = sizeof(sin);
600  if (getsockname(socketFD, (struct sockaddr *)&sin, &len) == -1)
601  {
602  VUserLog("Couldn't get socket name: %s", strerror(errno));
603  close(socketFD);
604  return 0;
605  }
606 
607  close(socketFD);
608 
609  return ntohs(sin.sin_port);
610 }
611 
614 static void VuoOscOut_destroy(VuoOscOut_internal ai);
616 {
617  if (VuoIsDebugEnabled())
618  {
619  char *summary = VuoOscOutputDevice_getSummary(device.device);
620  VUserLog("%s", summary);
621  free(summary);
622  }
623 
624  VuoInteger actualPort = device.device.port;
625  if (!actualPort)
626  actualPort = VuoOsc_findAvailableUdpPort();
627 
628  std::vector<IpEndpointName *> endpoints;
629  if (device.device.ipAddress)
630  endpoints.push_back(new IpEndpointName(device.device.ipAddress, actualPort));
631  else
632  {
633  // If we don't have a specific IP address,
634  // then we should broadcast to all interfaces (not just the default interface).
635 
636  struct ifaddrs *interfaces;
637  if (getifaddrs(&interfaces))
638  {
639  VUserLog("Using link-local broadcast, since I couldn't get a list of host interfaces: %s", strerror(errno));
640  endpoints.push_back(new IpEndpointName("255.255.255.255", actualPort));
641  }
642  else
643  {
644  for (struct ifaddrs *address = interfaces; address; address = address->ifa_next)
645  {
646  // Only add IPv4 interfaces.
647  if (address->ifa_addr->sa_family != AF_INET)
648  continue;
649 
650  struct sockaddr_in *ip = (struct sockaddr_in *)address->ifa_dstaddr;
651  endpoints.push_back(new IpEndpointName(ntohl(ip->sin_addr.s_addr), actualPort));
652  }
653  freeifaddrs(interfaces);
654  }
655  }
656 
657 
658  std::vector<UdpTransmitSocket *> sockets;
659  std::vector<CFNetServiceRef> netServices;
660  for (std::vector<IpEndpointName *>::iterator endpoint = endpoints.begin(); endpoint != endpoints.end(); ++endpoint)
661  {
662  try
663  {
664  if (VuoIsDebugEnabled())
665  {
666  char s[IpEndpointName::ADDRESS_AND_PORT_STRING_LENGTH];
667  (*endpoint)->AddressAndPortAsString(s);
668  VUserLog("\t%s", s);
669  }
670  UdpTransmitSocket *socket = new UdpTransmitSocket(**endpoint);
671  socket->SetEnableBroadcast(true);
672  sockets.push_back(socket);
673 
674  netServices.push_back(VuoOsc_createNetService((char *)device.device.name, (*endpoint)->port, false));
675  }
676  catch (std::exception const &e)
677  {
678  char s[IpEndpointName::ADDRESS_AND_PORT_STRING_LENGTH];
679  (*endpoint)->AddressAndPortAsString(s);
680  VUserLog("Error: %s %s", s, e.what());
681  delete *endpoint;
682  return NULL;
683  }
684  delete *endpoint;
685  }
686 
687 
690 
691  ai->netServices = netServices;
692 
693  ai->device = device.device;
695 
696  ai->sockets = sockets;
697 
698  ai->queue = dispatch_queue_create("org.vuo.osc.send", NULL);
699 
700  return ai;
701 }
703 {
704  for (std::vector<CFNetServiceRef>::iterator netService = ai->netServices.begin(); netService != ai->netServices.end(); ++netService)
705  VuoOsc_destroyNetService(*netService);
706 
707  for (std::vector<UdpTransmitSocket *>::iterator socket = ai->sockets.begin(); socket != ai->sockets.end(); ++socket)
708  delete *socket;
709 
710  VuoOscOut_internalPool->removeSharedInstance(ai->device);
711 
713 
714  dispatch_sync(ai->queue, ^{});
715  dispatch_release(ai->queue);
716 
717  delete ai;
718 }
721 
726 {
727  return (VuoOscOut)VuoOscOut_internalPool->getSharedInstance(device);
728 }
729 
734 {
735  if (!ao || !messages)
736  return;
737 
738  VuoRetain(messages);
740  dispatch_async(aii->queue, ^{
741  bool done = false;
742  int bufferSize = 256;
743  while (!done)
744  {
745  char *buffer = (char *)malloc(bufferSize);
746  try
747  {
748  osc::OutboundPacketStream p(buffer, bufferSize);
749 
750  VuoInteger messageCount = VuoListGetCount_VuoOscMessage(messages);
751 
752  if (messageCount > 1)
753  p << osc::BeginBundleImmediate;
754 
755  for(VuoInteger i = 1; i <= VuoListGetCount_VuoOscMessage(messages); ++i)
756  {
757  VuoOscMessage message = VuoListGetValue_VuoOscMessage(messages, i);
758  if (!message)
759  continue;
760 
761  p << osc::BeginMessage(message->address);
762 
763  for (int i = 0; i < message->dataCount; ++i)
764  {
765  if (message->dataTypes[i] == VuoOscType_Auto)
766  {
767  json_object *datum = message->data[i];
768 
769  json_type type = json_object_get_type(datum);
770 
771  if (type == json_type_null)
772  p << osc::OscNil;
773  else if (type == json_type_boolean)
774  p << (bool)json_object_get_boolean(datum);
775  else if (type == json_type_double)
776  p << json_object_get_double(datum);
777  else if (type == json_type_int)
778  p << (osc::int64)json_object_get_int64(datum);
779  else if (type == json_type_string)
780  p << json_object_get_string(datum);
781  else
782  {
783  VUserLog("Error: Unknown type: %s", json_type_to_name(type));
784  p << osc::OscNil;
785  }
786  }
787  else if (message->dataTypes[i] == VuoOscType_Int32)
788  p << (osc::int32)json_object_get_int(message->data[i]);
789  else if (message->dataTypes[i] == VuoOscType_Float32)
790  p << (float)json_object_get_double(message->data[i]);
791  else
792  {
793  VUserLog("Error: Unknown type: %d", message->dataTypes[i]);
794  p << osc::OscNil;
795  }
796  }
797 
798  p << osc::EndMessage;
799  }
800 
801  if (messageCount > 1)
802  p << osc::EndBundle;
803 
804  for (std::vector<UdpTransmitSocket *>::iterator socket = aii->sockets.begin(); socket != aii->sockets.end(); ++socket)
805  (*socket)->Send(p.Data(), p.Size());
806  done = true;
807  }
808  catch (osc::OutOfBufferMemoryException const &)
809  {
810  bufferSize *= 2;
811  }
812  catch (std::exception const &e)
813  {
814  VUserLog("Error: %s", e.what());
815  done = true;
816  }
817  free(buffer);
818  }
819  VuoRelease(messages);
820  });
821 }