|
1 /* GStreamer |
|
2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> |
|
3 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org> |
|
4 * |
|
5 * gsttcp.c: TCP functions |
|
6 * |
|
7 * This library is free software; you can redistribute it and/or |
|
8 * modify it under the terms of the GNU Library General Public |
|
9 * License as published by the Free Software Foundation; either |
|
10 * version 2 of the License, or (at your option) any later version. |
|
11 * |
|
12 * This library is distributed in the hope that it will be useful, |
|
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
|
15 * Library General Public License for more details. |
|
16 * |
|
17 * You should have received a copy of the GNU Library General Public |
|
18 * License along with this library; if not, write to the |
|
19 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, |
|
20 * Boston, MA 02111-1307, USA. |
|
21 */ |
|
22 |
|
23 #ifdef HAVE_CONFIG_H |
|
24 #include "config.h" |
|
25 #endif |
|
26 |
|
27 #include <sys/types.h> |
|
28 #include <sys/socket.h> |
|
29 #include <netinet/in.h> |
|
30 #include <arpa/inet.h> |
|
31 #include <netdb.h> |
|
32 #include <unistd.h> |
|
33 #include <sys/ioctl.h> |
|
34 |
|
35 #ifdef HAVE_FIONREAD_IN_SYS_FILIO |
|
36 #include <sys/filio.h> |
|
37 #endif |
|
38 |
|
39 #include "gsttcp.h" |
|
40 #ifdef __SYMBIAN32__ |
|
41 #include "gst/gst-i18n-plugin.h" |
|
42 #else |
|
43 #include <gst/gst-i18n-plugin.h> |
|
44 #endif |
|
45 |
|
46 GST_DEBUG_CATEGORY_EXTERN (tcp_debug); |
|
47 #define GST_CAT_DEFAULT tcp_debug |
|
48 |
|
49 #ifndef MSG_NOSIGNAL |
|
50 #define MSG_NOSIGNAL 0 |
|
51 #endif |
|
52 |
|
53 /* resolve host to IP address, throwing errors if it fails */ |
|
54 /* host can already be an IP address */ |
|
55 /* returns a newly allocated gchar * with the dotted ip address, |
|
56 or NULL, in which case it already fired an error. */ |
|
57 #ifdef __SYMBIAN32__ |
|
58 EXPORT_C |
|
59 #endif |
|
60 |
|
61 gchar * |
|
62 gst_tcp_host_to_ip (GstElement * element, const gchar * host) |
|
63 { |
|
64 struct hostent *hostinfo; |
|
65 char **addrs; |
|
66 gchar *ip; |
|
67 struct in_addr addr; |
|
68 |
|
69 GST_DEBUG_OBJECT (element, "resolving host %s", host); |
|
70 |
|
71 /* first check if it already is an IP address */ |
|
72 #ifndef __SYMBIAN32__ |
|
73 if (inet_aton (host, &addr)) { |
|
74 #else |
|
75 if ((addr.s_addr=inet_addr(host))==-1){ |
|
76 #endif //__SYMBIAN32__ |
|
77 ip = g_strdup (host); |
|
78 goto beach; |
|
79 } |
|
80 /* FIXME: could do a localhost check here */ |
|
81 |
|
82 /* perform a name lookup */ |
|
83 if (!(hostinfo = gethostbyname (host))) |
|
84 goto resolve_error; |
|
85 |
|
86 if (hostinfo->h_addrtype != AF_INET) |
|
87 goto not_ip; |
|
88 |
|
89 addrs = hostinfo->h_addr_list; |
|
90 |
|
91 /* There could be more than one IP address, but we just return the first */ |
|
92 ip = g_strdup (inet_ntoa (*(struct in_addr *) *addrs)); |
|
93 |
|
94 beach: |
|
95 GST_DEBUG_OBJECT (element, "resolved to IP %s", ip); |
|
96 return ip; |
|
97 |
|
98 resolve_error: |
|
99 { |
|
100 GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL), |
|
101 ("Could not find IP address for host \"%s\".", host)); |
|
102 return NULL; |
|
103 } |
|
104 not_ip: |
|
105 { |
|
106 GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL), |
|
107 ("host \"%s\" is not an IP host", host)); |
|
108 return NULL; |
|
109 } |
|
110 } |
|
111 |
|
112 /* write buffer to given socket incrementally. |
|
113 * Returns number of bytes written. |
|
114 */ |
|
115 #ifdef __SYMBIAN32__ |
|
116 EXPORT_C |
|
117 #endif |
|
118 |
|
119 gint |
|
120 gst_tcp_socket_write (int socket, const void *buf, size_t count) |
|
121 { |
|
122 size_t bytes_written = 0; |
|
123 |
|
124 while (bytes_written < count) { |
|
125 ssize_t wrote = send (socket, (const char *) buf + bytes_written, |
|
126 count - bytes_written, MSG_NOSIGNAL); |
|
127 |
|
128 if (wrote <= 0) { |
|
129 return bytes_written; |
|
130 } |
|
131 bytes_written += wrote; |
|
132 } |
|
133 |
|
134 if (bytes_written < 0) |
|
135 GST_WARNING ("error while writing"); |
|
136 else |
|
137 GST_LOG ("wrote %" G_GSIZE_FORMAT " bytes succesfully", bytes_written); |
|
138 return bytes_written; |
|
139 } |
|
140 |
|
141 /* atomically read count bytes into buf, cancellable. return val of GST_FLOW_OK |
|
142 * indicates success, anything else is failure. |
|
143 */ |
|
144 static GstFlowReturn |
|
145 gst_tcp_socket_read (GstElement * this, int socket, void *buf, size_t count, |
|
146 GstPoll * fdset) |
|
147 { |
|
148 ssize_t n; |
|
149 size_t bytes_read; |
|
150 int num_to_read; |
|
151 int ret; |
|
152 |
|
153 bytes_read = 0; |
|
154 |
|
155 while (bytes_read < count) { |
|
156 /* do a blocking select on the socket */ |
|
157 /* no action (0) is an error too in our case */ |
|
158 if ((ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE)) <= 0) { |
|
159 if (ret == -1 && errno == EBUSY) |
|
160 goto cancelled; |
|
161 else |
|
162 goto select_error; |
|
163 } |
|
164 |
|
165 /* ask how much is available for reading on the socket */ |
|
166 if (ioctl (socket, FIONREAD, &num_to_read) < 0) |
|
167 goto ioctl_error; |
|
168 |
|
169 if (num_to_read == 0) |
|
170 goto got_eos; |
|
171 |
|
172 /* sizeof(ssize_t) >= sizeof(int), so I know num_to_read <= SSIZE_MAX */ |
|
173 |
|
174 num_to_read = MIN (num_to_read, count - bytes_read); |
|
175 |
|
176 n = read (socket, ((guint8 *) buf) + bytes_read, num_to_read); |
|
177 |
|
178 if (n < 0) |
|
179 goto read_error; |
|
180 |
|
181 if (n < num_to_read) |
|
182 goto short_read; |
|
183 |
|
184 bytes_read += num_to_read; |
|
185 } |
|
186 |
|
187 return GST_FLOW_OK; |
|
188 |
|
189 /* ERRORS */ |
|
190 select_error: |
|
191 { |
|
192 GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), |
|
193 ("select failed: %s", g_strerror (errno))); |
|
194 return GST_FLOW_ERROR; |
|
195 } |
|
196 cancelled: |
|
197 { |
|
198 GST_DEBUG_OBJECT (this, "Select was cancelled"); |
|
199 return GST_FLOW_WRONG_STATE; |
|
200 } |
|
201 ioctl_error: |
|
202 { |
|
203 GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), |
|
204 ("ioctl failed: %s", g_strerror (errno))); |
|
205 return GST_FLOW_ERROR; |
|
206 } |
|
207 got_eos: |
|
208 { |
|
209 GST_DEBUG_OBJECT (this, "Got EOS on socket stream"); |
|
210 return GST_FLOW_UNEXPECTED; |
|
211 } |
|
212 read_error: |
|
213 { |
|
214 GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), |
|
215 ("read failed: %s", g_strerror (errno))); |
|
216 return GST_FLOW_ERROR; |
|
217 } |
|
218 short_read: |
|
219 { |
|
220 GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), |
|
221 ("short read: wanted %d bytes, got %" G_GSSIZE_FORMAT, num_to_read, n)); |
|
222 return GST_FLOW_ERROR; |
|
223 } |
|
224 } |
|
225 |
|
226 /* close the socket and reset the fd. Used to clean up after errors. */ |
|
227 #ifdef __SYMBIAN32__ |
|
228 EXPORT_C |
|
229 #endif |
|
230 |
|
231 void |
|
232 gst_tcp_socket_close (GstPollFD * socket) |
|
233 { |
|
234 if (socket->fd >= 0) { |
|
235 close (socket->fd); |
|
236 socket->fd = -1; |
|
237 } |
|
238 } |
|
239 |
|
240 /* read a buffer from the given socket |
|
241 * returns: |
|
242 * - a GstBuffer in which data should be read |
|
243 * - NULL, indicating a connection close or an error, to be handled with |
|
244 * EOS |
|
245 */ |
|
246 #ifdef __SYMBIAN32__ |
|
247 EXPORT_C |
|
248 #endif |
|
249 |
|
250 GstFlowReturn |
|
251 gst_tcp_read_buffer (GstElement * this, int socket, GstPoll * fdset, |
|
252 GstBuffer ** buf) |
|
253 { |
|
254 int ret; |
|
255 ssize_t bytes_read; |
|
256 int readsize; |
|
257 |
|
258 *buf = NULL; |
|
259 |
|
260 /* do a blocking select on the socket */ |
|
261 /* no action (0) is an error too in our case */ |
|
262 if ((ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE)) <= 0) { |
|
263 if (ret == -1 && errno == EBUSY) |
|
264 goto cancelled; |
|
265 else |
|
266 goto select_error; |
|
267 } |
|
268 |
|
269 /* ask how much is available for reading on the socket */ |
|
270 if ((ret = ioctl (socket, FIONREAD, &readsize)) < 0) |
|
271 goto ioctl_error; |
|
272 |
|
273 if (readsize == 0) |
|
274 goto got_eos; |
|
275 |
|
276 /* sizeof(ssize_t) >= sizeof(int), so I know readsize <= SSIZE_MAX */ |
|
277 |
|
278 *buf = gst_buffer_new_and_alloc (readsize); |
|
279 |
|
280 bytes_read = read (socket, GST_BUFFER_DATA (*buf), readsize); |
|
281 |
|
282 if (bytes_read < 0) |
|
283 goto read_error; |
|
284 |
|
285 if (bytes_read < readsize) |
|
286 /* but mom, you promised to give me readsize bytes! */ |
|
287 goto short_read; |
|
288 |
|
289 GST_LOG_OBJECT (this, "returning buffer of size %d", GST_BUFFER_SIZE (*buf)); |
|
290 return GST_FLOW_OK; |
|
291 |
|
292 /* ERRORS */ |
|
293 select_error: |
|
294 { |
|
295 GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), |
|
296 ("select failed: %s", g_strerror (errno))); |
|
297 return GST_FLOW_ERROR; |
|
298 } |
|
299 cancelled: |
|
300 { |
|
301 GST_DEBUG_OBJECT (this, "Select was cancelled"); |
|
302 return GST_FLOW_WRONG_STATE; |
|
303 } |
|
304 ioctl_error: |
|
305 { |
|
306 GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), |
|
307 ("ioctl failed: %s", g_strerror (errno))); |
|
308 return GST_FLOW_ERROR; |
|
309 } |
|
310 got_eos: |
|
311 { |
|
312 GST_DEBUG_OBJECT (this, "Got EOS on socket stream"); |
|
313 return GST_FLOW_WRONG_STATE; |
|
314 } |
|
315 read_error: |
|
316 { |
|
317 GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), |
|
318 ("read failed: %s", g_strerror (errno))); |
|
319 gst_buffer_unref (*buf); |
|
320 *buf = NULL; |
|
321 return GST_FLOW_ERROR; |
|
322 } |
|
323 short_read: |
|
324 { |
|
325 GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), |
|
326 ("short read: wanted %d bytes, got %" G_GSSIZE_FORMAT, readsize, |
|
327 bytes_read)); |
|
328 gst_buffer_unref (*buf); |
|
329 *buf = NULL; |
|
330 return GST_FLOW_ERROR; |
|
331 } |
|
332 } |
|
333 |
|
334 /* read a buffer from the given socket |
|
335 * returns: |
|
336 * - a GstBuffer in which data should be read |
|
337 * - NULL, indicating a connection close or an error, to be handled with |
|
338 * EOS |
|
339 */ |
|
340 #ifdef __SYMBIAN32__ |
|
341 EXPORT_C |
|
342 #endif |
|
343 |
|
344 GstFlowReturn |
|
345 gst_tcp_gdp_read_buffer (GstElement * this, int socket, GstPoll * fdset, |
|
346 GstBuffer ** buf) |
|
347 { |
|
348 GstFlowReturn ret; |
|
349 guint8 *header = NULL; |
|
350 |
|
351 GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header", |
|
352 GST_DP_HEADER_LENGTH); |
|
353 |
|
354 *buf = NULL; |
|
355 header = g_malloc (GST_DP_HEADER_LENGTH); |
|
356 |
|
357 ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, fdset); |
|
358 |
|
359 if (ret != GST_FLOW_OK) |
|
360 goto header_read_error; |
|
361 |
|
362 if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header)) |
|
363 goto validate_error; |
|
364 |
|
365 if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_BUFFER) |
|
366 goto is_not_buffer; |
|
367 |
|
368 GST_LOG_OBJECT (this, "validated buffer packet header"); |
|
369 |
|
370 *buf = gst_dp_buffer_from_header (GST_DP_HEADER_LENGTH, header); |
|
371 |
|
372 g_free (header); |
|
373 |
|
374 ret = gst_tcp_socket_read (this, socket, GST_BUFFER_DATA (*buf), |
|
375 GST_BUFFER_SIZE (*buf), fdset); |
|
376 |
|
377 if (ret != GST_FLOW_OK) |
|
378 goto data_read_error; |
|
379 |
|
380 return GST_FLOW_OK; |
|
381 |
|
382 /* ERRORS */ |
|
383 header_read_error: |
|
384 { |
|
385 g_free (header); |
|
386 return ret; |
|
387 } |
|
388 validate_error: |
|
389 { |
|
390 GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), |
|
391 ("GDP buffer packet header does not validate")); |
|
392 g_free (header); |
|
393 return GST_FLOW_ERROR; |
|
394 } |
|
395 is_not_buffer: |
|
396 { |
|
397 GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), |
|
398 ("GDP packet contains something that is not a buffer (type %d)", |
|
399 gst_dp_header_payload_type (header))); |
|
400 g_free (header); |
|
401 return GST_FLOW_ERROR; |
|
402 } |
|
403 data_read_error: |
|
404 { |
|
405 gst_buffer_unref (*buf); |
|
406 *buf = NULL; |
|
407 return ret; |
|
408 } |
|
409 } |
|
410 #ifdef __SYMBIAN32__ |
|
411 EXPORT_C |
|
412 #endif |
|
413 |
|
414 |
|
415 GstFlowReturn |
|
416 gst_tcp_gdp_read_caps (GstElement * this, int socket, GstPoll * fdset, |
|
417 GstCaps ** caps) |
|
418 { |
|
419 GstFlowReturn ret; |
|
420 guint8 *header = NULL; |
|
421 guint8 *payload = NULL; |
|
422 size_t payload_length; |
|
423 |
|
424 GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", |
|
425 GST_DP_HEADER_LENGTH); |
|
426 |
|
427 *caps = NULL; |
|
428 header = g_malloc (GST_DP_HEADER_LENGTH); |
|
429 |
|
430 ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, fdset); |
|
431 |
|
432 if (ret != GST_FLOW_OK) |
|
433 goto header_read_error; |
|
434 |
|
435 if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header)) |
|
436 goto header_validate_error; |
|
437 |
|
438 if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS) |
|
439 goto is_not_caps; |
|
440 |
|
441 GST_LOG_OBJECT (this, "validated caps packet header"); |
|
442 |
|
443 payload_length = gst_dp_header_payload_length (header); |
|
444 payload = g_malloc (payload_length); |
|
445 |
|
446 GST_LOG_OBJECT (this, |
|
447 "Reading %" G_GSIZE_FORMAT " bytes for caps packet payload", |
|
448 payload_length); |
|
449 |
|
450 ret = gst_tcp_socket_read (this, socket, payload, payload_length, fdset); |
|
451 |
|
452 if (ret != GST_FLOW_OK) |
|
453 goto payload_read_error; |
|
454 |
|
455 if (!gst_dp_validate_payload (GST_DP_HEADER_LENGTH, header, payload)) |
|
456 goto payload_validate_error; |
|
457 |
|
458 *caps = gst_dp_caps_from_packet (GST_DP_HEADER_LENGTH, header, payload); |
|
459 |
|
460 GST_DEBUG_OBJECT (this, "Got caps over GDP: %" GST_PTR_FORMAT, *caps); |
|
461 |
|
462 g_free (header); |
|
463 g_free (payload); |
|
464 |
|
465 return GST_FLOW_OK; |
|
466 |
|
467 /* ERRORS */ |
|
468 header_read_error: |
|
469 { |
|
470 g_free (header); |
|
471 return ret; |
|
472 } |
|
473 header_validate_error: |
|
474 { |
|
475 GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), |
|
476 ("GDP caps packet header does not validate")); |
|
477 g_free (header); |
|
478 return GST_FLOW_ERROR; |
|
479 } |
|
480 is_not_caps: |
|
481 { |
|
482 GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), |
|
483 ("GDP packet contains something that is not a caps (type %d)", |
|
484 gst_dp_header_payload_type (header))); |
|
485 g_free (header); |
|
486 return GST_FLOW_ERROR; |
|
487 } |
|
488 payload_read_error: |
|
489 { |
|
490 g_free (header); |
|
491 g_free (payload); |
|
492 return ret; |
|
493 } |
|
494 payload_validate_error: |
|
495 { |
|
496 GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), |
|
497 ("GDP caps packet payload does not validate")); |
|
498 g_free (header); |
|
499 g_free (payload); |
|
500 return GST_FLOW_ERROR; |
|
501 } |
|
502 } |
|
503 |
|
504 /* write a GDP header to the socket. Return false if fails. */ |
|
505 #ifdef __SYMBIAN32__ |
|
506 EXPORT_C |
|
507 #endif |
|
508 |
|
509 gboolean |
|
510 gst_tcp_gdp_write_buffer (GstElement * this, int socket, GstBuffer * buffer, |
|
511 gboolean fatal, const gchar * host, int port) |
|
512 { |
|
513 guint length; |
|
514 guint8 *header; |
|
515 size_t wrote; |
|
516 |
|
517 if (!gst_dp_header_from_buffer (buffer, 0, &length, &header)) |
|
518 goto create_error; |
|
519 |
|
520 GST_LOG_OBJECT (this, "writing %d bytes for GDP buffer header", length); |
|
521 wrote = gst_tcp_socket_write (socket, header, length); |
|
522 g_free (header); |
|
523 |
|
524 if (wrote != length) |
|
525 goto write_error; |
|
526 |
|
527 return TRUE; |
|
528 |
|
529 /* ERRORS */ |
|
530 create_error: |
|
531 { |
|
532 if (fatal) |
|
533 GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL), |
|
534 ("Could not create GDP header from buffer")); |
|
535 return FALSE; |
|
536 } |
|
537 write_error: |
|
538 { |
|
539 if (fatal) |
|
540 GST_ELEMENT_ERROR (this, RESOURCE, WRITE, |
|
541 (_("Error while sending data to \"%s:%d\"."), host, port), |
|
542 ("Only %" G_GSIZE_FORMAT " of %u bytes written: %s", |
|
543 wrote, GST_BUFFER_SIZE (buffer), g_strerror (errno))); |
|
544 return FALSE; |
|
545 } |
|
546 } |
|
547 |
|
548 /* write GDP header and payload to the given socket for the given caps. |
|
549 * Return false if fails. */ |
|
550 #ifdef __SYMBIAN32__ |
|
551 EXPORT_C |
|
552 #endif |
|
553 |
|
554 gboolean |
|
555 gst_tcp_gdp_write_caps (GstElement * this, int socket, const GstCaps * caps, |
|
556 gboolean fatal, const char *host, int port) |
|
557 { |
|
558 guint length; |
|
559 guint8 *header; |
|
560 guint8 *payload; |
|
561 size_t wrote; |
|
562 |
|
563 if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) |
|
564 goto create_error; |
|
565 |
|
566 GST_LOG_OBJECT (this, "writing %d bytes for GDP caps header", length); |
|
567 wrote = gst_tcp_socket_write (socket, header, length); |
|
568 if (wrote != length) |
|
569 goto write_header_error; |
|
570 |
|
571 length = gst_dp_header_payload_length (header); |
|
572 g_free (header); |
|
573 |
|
574 GST_LOG_OBJECT (this, "writing %d bytes for GDP caps payload", length); |
|
575 wrote = gst_tcp_socket_write (socket, payload, length); |
|
576 g_free (payload); |
|
577 |
|
578 if (wrote != length) |
|
579 goto write_payload_error; |
|
580 |
|
581 return TRUE; |
|
582 |
|
583 /* ERRORS */ |
|
584 create_error: |
|
585 { |
|
586 if (fatal) |
|
587 GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL), |
|
588 ("Could not create GDP packet from caps")); |
|
589 return FALSE; |
|
590 } |
|
591 write_header_error: |
|
592 { |
|
593 g_free (header); |
|
594 g_free (payload); |
|
595 if (fatal) |
|
596 GST_ELEMENT_ERROR (this, RESOURCE, WRITE, |
|
597 (_("Error while sending gdp header data to \"%s:%d\"."), host, port), |
|
598 ("Only %" G_GSIZE_FORMAT " of %u bytes written: %s", |
|
599 wrote, length, g_strerror (errno))); |
|
600 return FALSE; |
|
601 } |
|
602 write_payload_error: |
|
603 { |
|
604 if (fatal) |
|
605 GST_ELEMENT_ERROR (this, RESOURCE, WRITE, |
|
606 (_("Error while sending gdp payload data to \"%s:%d\"."), host, port), |
|
607 ("Only %" G_GSIZE_FORMAT " of %u bytes written: %s", |
|
608 wrote, length, g_strerror (errno))); |
|
609 return FALSE; |
|
610 } |
|
611 } |