Vuo  2.4.0
VuoTelemetry.cc
Go to the documentation of this file.
1
10#include <stdio.h>
11#include <stdlib.h>
12#include <string.h>
13#include "VuoTelemetry.hh"
14
22extern "C" char * vuoCopyStringFromMessage(zmq_msg_t *message)
23{
24 size_t messageSize = zmq_msg_size(message);
25 if (!messageSize)
26 return NULL;
27 char *string = (char *)malloc(messageSize);
28 memcpy(string, zmq_msg_data(message), messageSize);
29 return string;
30}
31
35extern "C" void vuoInitMessageWithString(zmq_msg_t *message, const char *string)
36{
37 size_t messageSize = (string != NULL ? (strlen(string) + 1) : 0);
38 zmq_msg_init_size(message, messageSize);
39 memcpy(zmq_msg_data(message), string, messageSize);
40}
41
45extern "C" void vuoInitMessageWithInt(zmq_msg_t *message, int value)
46{
47 size_t messageSize = sizeof(int);
48 zmq_msg_init_size(message, messageSize);
49 memcpy(zmq_msg_data(message), &value, messageSize);
50}
51
55extern "C" void vuoInitMessageWithBool(zmq_msg_t *message, bool value)
56{
57 size_t messageSize = sizeof(bool);
58 zmq_msg_init_size(message, messageSize);
59 memcpy(zmq_msg_data(message), &value, messageSize);
60}
61
65extern "C" bool VuoTelemetry_hasMoreToReceive(void *socket)
66{
67 int more;
68 size_t moreSize = sizeof more;
69 zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &moreSize);
70 return more;
71}
72
76extern "C" char * vuoReceiveAndCopyString(void *socket, char **error)
77{
78 zmq_msg_t message;
79 zmq_msg_init(&message);
80
81 if (zmq_msg_recv(&message, socket, 0) == -1)
82 {
83 int e = errno;
84 char *errorMessage;
85 if (e == EAGAIN)
86 errorMessage = strdup("The connection between the composition and runner timed out when trying to receive a message");
87 else
88 {
89 const char *eStr = zmq_strerror(e);
90 const char *format = "The connection between the composition and runner failed when trying to receive a message (%s)";
91 int size = snprintf(NULL, 0, format, eStr);
92 errorMessage = (char *)malloc(size+1);
93 snprintf(errorMessage, size+1, format, eStr);
94 }
95 if (error)
96 *error = errorMessage;
97 else
98 {
99 VUserLog("%s", errorMessage);
100 free(errorMessage);
101 }
102 zmq_msg_close(&message);
103 return NULL;
104 }
105
106 char *string = vuoCopyStringFromMessage(&message);
107 zmq_msg_close(&message);
108 return string;
109}
110
114void vuoReceiveBlocking(void *socket, void *data, size_t dataSize, char **error)
115{
116 zmq_msg_t message;
117 zmq_msg_init(&message);
118
119 if (zmq_msg_recv(&message, socket, 0) == -1)
120 {
121 int e = errno;
122 char *errorMessage;
123 if (e == EAGAIN)
124 errorMessage = strdup("The connection between the composition and runner timed out when trying to receive a message");
125 else if (e == ETERM)
126 errorMessage = strdup("The connection between the composition and runner failed because the context was terminated when trying to receive a message");
127 else
128 {
129 const char *eStr = zmq_strerror(e);
130 const char *format = "The connection between the composition and runner failed when trying to receive a message (%s)";
131 int size = snprintf(NULL, 0, format, eStr);
132 errorMessage = (char *)malloc(size+1);
133 snprintf(errorMessage, size+1, format, eStr);
134 }
135 if (error)
136 *error = errorMessage;
137 else
138 {
139 VUserLog("%s", errorMessage);
140 free(errorMessage);
141 }
142 zmq_msg_close(&message);
143 bzero(data, dataSize);
144 return;
145 }
146
147 size_t messageSize = zmq_msg_size(&message);
148 if (messageSize != dataSize)
149 {
150 const char *format = "A wrong-sized message was received in the connection between the composition and runner "
151 "(expected %lu bytes, received %lu bytes)";
152 int size = snprintf(NULL, 0, format, (unsigned long)dataSize, (unsigned long)messageSize);
153 char *errorMessage = (char *)malloc(size+1);
154 snprintf(errorMessage, size+1, format, (unsigned long)dataSize, (unsigned long)messageSize);
155 if (error)
156 *error = errorMessage;
157 else
158 {
159 VUserLog("%s", errorMessage);
160 free(errorMessage);
161 }
162 zmq_msg_close(&message);
163 bzero(data, dataSize);
164 return;
165 }
166
167 memcpy(data, zmq_msg_data(&message), messageSize);
168 zmq_msg_close(&message);
169}
170
174extern "C" unsigned long vuoReceiveUnsignedInt64(void *socket, char **error)
175{
176 uint64_t number = 0;
177 vuoReceiveBlocking(socket, (void *)&number, sizeof(number), error);
178 return number;
179}
180
184extern "C" int vuoReceiveInt(void *socket, char **error)
185{
186 int number = 0;
187 vuoReceiveBlocking(socket, (void *)&number, sizeof(number), error);
188 return number;
189}
190
194extern "C" bool vuoReceiveBool(void *socket, char **error)
195{
196 bool value = false;
197 vuoReceiveBlocking(socket, (void *)&value, sizeof(value), error);
198 return value;
199}
200
205extern "C" void vuoSend(const char *name, void *socket, int type, zmq_msg_t *messages, unsigned int messageCount, bool isNonBlocking, char **error)
206{
207 int e = 0;
208
209 // send the type message-part
210 {
211 zmq_msg_t message;
212 zmq_msg_init_size(&message, sizeof type);
213 memcpy(zmq_msg_data(&message), &type, sizeof type);
214 int flags = (messageCount > 0 ? ZMQ_SNDMORE : 0) | (isNonBlocking ? ZMQ_DONTWAIT : 0);
215 if (zmq_msg_send(&message, socket, flags) == -1)
216 e = errno;
217 zmq_msg_close(&message);
218 }
219
220 // send the data message-parts
221 for(unsigned int i=0; i<messageCount && e==0; ++i)
222 {
223 int flags = (i < messageCount - 1 ? ZMQ_SNDMORE : 0) | (isNonBlocking ? ZMQ_DONTWAIT : 0);
224 if (zmq_msg_send(&messages[i], socket, flags) == -1)
225 e = errno;
226 }
227
228 for(unsigned int i=0; i<messageCount; ++i)
229 zmq_msg_close(&messages[i]);
230
231 if (e != 0)
232 {
233 char *errorMessage;
234 if (e == EAGAIN)
235 {
236 const char *format = "The connection between the composition and runner timed out when trying to send a message of type %i on '%s'";
237 int size = snprintf(NULL, 0, format, type, name);
238 errorMessage = (char *)malloc(size+1);
239 snprintf(errorMessage, size+1, format, type, name);
240 }
241 else
242 {
243 const char *eStr = zmq_strerror(e);
244 const char *format = "The connection between the composition and runner failed when trying to send a message of type %i on '%s' (%s)";
245 int size = snprintf(NULL, 0, format, type, name, eStr);
246 errorMessage = (char *)malloc(size+1);
247 snprintf(errorMessage, size+1, format, type, name, eStr);
248 }
249 if (error)
250 *error = errorMessage;
251 else
252 {
253 VUserLog("%s", errorMessage);
254 free(errorMessage);
255 }
256 }
257}
258
259#include <atomic>
269extern "C" void vuoMemoryBarrier(void)
270{
271 atomic_thread_fence(memory_order_seq_cst);
272}