Vuo  2.3.2
VuoTelemetry.c
Go to the documentation of this file.
1 
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include "VuoTelemetry.h"
14 
22 char * vuoCopyStringFromMessage(zmq_msg_t *message)
23 {
24  size_t messageSize = zmq_msg_size(message);
25  if (!messageSize)
26  return NULL;
27  char *string = (char *)malloc(messageSize);
28  memcpy(string, zmq_msg_data(message), messageSize);
29  return string;
30 }
31 
35 void vuoInitMessageWithString(zmq_msg_t *message, const char *string)
36 {
37  size_t messageSize = (string != NULL ? (strlen(string) + 1) : 0);
38  zmq_msg_init_size(message, messageSize);
39  memcpy(zmq_msg_data(message), string, messageSize);
40 }
41 
45 void vuoInitMessageWithInt(zmq_msg_t *message, int value)
46 {
47  size_t messageSize = sizeof(int);
48  zmq_msg_init_size(message, messageSize);
49  memcpy(zmq_msg_data(message), &value, messageSize);
50 }
51 
55 void vuoInitMessageWithBool(zmq_msg_t *message, bool value)
56 {
57  size_t messageSize = sizeof(bool);
58  zmq_msg_init_size(message, messageSize);
59  memcpy(zmq_msg_data(message), &value, messageSize);
60 }
61 
66 {
67  int more;
68  size_t moreSize = sizeof more;
69  zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &moreSize);
70  return more;
71 }
72 
76 char * vuoReceiveAndCopyString(void *socket, char **error)
77 {
78  zmq_msg_t message;
79  zmq_msg_init(&message);
80 
81  if (zmq_msg_recv(&message, socket, 0) == -1)
82  {
83  int e = errno;
84  char *errorMessage;
85  if (e == EAGAIN)
86  errorMessage = strdup("The connection between the composition and runner timed out when trying to receive a message");
87  else
88  {
89  const char *eStr = zmq_strerror(e);
90  const char *format = "The connection between the composition and runner failed when trying to receive a message (%s)";
91  int size = snprintf(NULL, 0, format, eStr);
92  errorMessage = malloc(size+1);
93  snprintf(errorMessage, size+1, format, eStr);
94  }
95  if (error)
96  *error = errorMessage;
97  else
98  {
99  VUserLog("%s", errorMessage);
100  free(errorMessage);
101  }
102  zmq_msg_close(&message);
103  return NULL;
104  }
105 
106  char *string = vuoCopyStringFromMessage(&message);
107  zmq_msg_close(&message);
108  return string;
109 }
110 
114 void vuoReceiveBlocking(void *socket, void *data, size_t dataSize, char **error)
115 {
116  zmq_msg_t message;
117  zmq_msg_init(&message);
118 
119  if (zmq_msg_recv(&message, socket, 0) == -1)
120  {
121  int e = errno;
122  char *errorMessage;
123  if (e == EAGAIN)
124  errorMessage = strdup("The connection between the composition and runner timed out when trying to receive a message");
125  else if (e == ETERM)
126  errorMessage = strdup("The connection between the composition and runner failed because the context was terminated when trying to receive a message");
127  else
128  {
129  const char *eStr = zmq_strerror(e);
130  const char *format = "The connection between the composition and runner failed when trying to receive a message (%s)";
131  int size = snprintf(NULL, 0, format, eStr);
132  errorMessage = malloc(size+1);
133  snprintf(errorMessage, size+1, format, eStr);
134  }
135  if (error)
136  *error = errorMessage;
137  else
138  {
139  VUserLog("%s", errorMessage);
140  free(errorMessage);
141  }
142  zmq_msg_close(&message);
143  bzero(data, dataSize);
144  return;
145  }
146 
147  size_t messageSize = zmq_msg_size(&message);
148  if (messageSize != dataSize)
149  {
150  const char *format = "A wrong-sized message was received in the connection between the composition and runner "
151  "(expected %lu bytes, received %lu bytes)";
152  int size = snprintf(NULL, 0, format, (unsigned long)dataSize, (unsigned long)messageSize);
153  char *errorMessage = malloc(size+1);
154  snprintf(errorMessage, size+1, format, (unsigned long)dataSize, (unsigned long)messageSize);
155  if (error)
156  *error = errorMessage;
157  else
158  {
159  VUserLog("%s", errorMessage);
160  free(errorMessage);
161  }
162  zmq_msg_close(&message);
163  bzero(data, dataSize);
164  return;
165  }
166 
167  memcpy(data, zmq_msg_data(&message), messageSize);
168  zmq_msg_close(&message);
169 }
170 
174 unsigned long vuoReceiveUnsignedInt64(void *socket, char **error)
175 {
176  uint64_t number = 0;
177  vuoReceiveBlocking(socket, (void *)&number, sizeof(number), error);
178  return number;
179 }
180 
184 int vuoReceiveInt(void *socket, char **error)
185 {
186  int number = 0;
187  vuoReceiveBlocking(socket, (void *)&number, sizeof(number), error);
188  return number;
189 }
190 
194 bool vuoReceiveBool(void *socket, char **error)
195 {
196  bool value = false;
197  vuoReceiveBlocking(socket, (void *)&value, sizeof(value), error);
198  return value;
199 }
200 
205 void vuoSend(const char *name, void *socket, int type, zmq_msg_t *messages, unsigned int messageCount, bool isNonBlocking, char **error)
206 {
207  int e = 0;
208 
209  // send the type message-part
210  {
211  zmq_msg_t message;
212  zmq_msg_init_size(&message, sizeof type);
213  memcpy(zmq_msg_data(&message), &type, sizeof type);
214  int flags = (messageCount > 0 ? ZMQ_SNDMORE : 0) | (isNonBlocking ? ZMQ_DONTWAIT : 0);
215  if (zmq_msg_send(&message, socket, flags) == -1)
216  e = errno;
217  zmq_msg_close(&message);
218  }
219 
220  // send the data message-parts
221  for(unsigned int i=0; i<messageCount && e==0; ++i)
222  {
223  int flags = (i < messageCount - 1 ? ZMQ_SNDMORE : 0) | (isNonBlocking ? ZMQ_DONTWAIT : 0);
224  if (zmq_msg_send(&messages[i], socket, flags) == -1)
225  e = errno;
226  }
227 
228  for(unsigned int i=0; i<messageCount; ++i)
229  zmq_msg_close(&messages[i]);
230 
231  if (e != 0)
232  {
233  char *errorMessage;
234  if (e == EAGAIN)
235  {
236  const char *format = "The connection between the composition and runner timed out when trying to send a message of type %i on '%s'";
237  int size = snprintf(NULL, 0, format, type, name);
238  errorMessage = malloc(size+1);
239  snprintf(errorMessage, size+1, format, type, name);
240  }
241  else
242  {
243  const char *eStr = zmq_strerror(e);
244  const char *format = "The connection between the composition and runner failed when trying to send a message of type %i on '%s' (%s)";
245  int size = snprintf(NULL, 0, format, type, name, eStr);
246  errorMessage = malloc(size+1);
247  snprintf(errorMessage, size+1, format, type, name, eStr);
248  }
249  if (error)
250  *error = errorMessage;
251  else
252  {
253  VUserLog("%s", errorMessage);
254  free(errorMessage);
255  }
256  }
257 }
258 
259 #include <libkern/OSAtomic.h>
269 {
270  OSMemoryBarrier();
271 }