gst_plugins_base/gst/tcp/gsttcpclientsrc.c
changeset 2 5505e8908944
parent 0 0e761a78d257
child 7 567bb019e3e3
equal deleted inserted replaced
1:4c282e7dd6d3 2:5505e8908944
       
     1 /* GStreamer
       
     2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
       
     3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
       
     4  *
       
     5  * This library is free software; you can redistribute it and/or
       
     6  * modify it under the terms of the GNU Library General Public
       
     7  * License as published by the Free Software Foundation; either
       
     8  * version 2 of the License, or (at your option) any later version.
       
     9  *
       
    10  * This library is distributed in the hope that it will be useful,
       
    11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
       
    13  * Library General Public License for more details.
       
    14  *
       
    15  * You should have received a copy of the GNU Library General Public
       
    16  * License along with this library; if not, write to the
       
    17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
       
    18  * Boston, MA 02111-1307, USA.
       
    19  */
       
    20 
       
    21 
       
    22 #ifdef HAVE_CONFIG_H
       
    23 #include "config.h"
       
    24 #endif
       
    25 
       
    26 #ifdef __SYMBIAN32__
       
    27 #include "gst/gst-i18n-plugin.h"
       
    28 #else
       
    29 #include <gst/gst-i18n-plugin.h>
       
    30 #endif
       
    31 #include "gsttcp.h"
       
    32 #include "gsttcpclientsrc.h"
       
    33 #include <string.h>             /* memset */
       
    34 #include <unistd.h>
       
    35 #include <arpa/inet.h>
       
    36 #include <fcntl.h>
       
    37 
       
    38 
       
    39 GST_DEBUG_CATEGORY_STATIC (tcpclientsrc_debug);
       
    40 #define GST_CAT_DEFAULT tcpclientsrc_debug
       
    41 
       
    42 #define MAX_READ_SIZE                   4 * 1024
       
    43 
       
    44 
       
    45 static const GstElementDetails gst_tcp_client_src_details =
       
    46 GST_ELEMENT_DETAILS ("TCP client source",
       
    47     "Source/Network",
       
    48     "Receive data as a client over the network via TCP",
       
    49     "Thomas Vander Stichele <thomas at apestaart dot org>");
       
    50 
       
    51 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
       
    52     GST_PAD_SRC,
       
    53     GST_PAD_ALWAYS,
       
    54     GST_STATIC_CAPS_ANY);
       
    55 
       
    56 
       
    57 enum
       
    58 {
       
    59   PROP_0,
       
    60   PROP_HOST,
       
    61   PROP_PORT,
       
    62   PROP_PROTOCOL
       
    63 };
       
    64 
       
    65 
       
    66 GST_BOILERPLATE (GstTCPClientSrc, gst_tcp_client_src, GstPushSrc,
       
    67     GST_TYPE_PUSH_SRC);
       
    68 
       
    69 
       
    70 static void gst_tcp_client_src_finalize (GObject * gobject);
       
    71 
       
    72 static GstCaps *gst_tcp_client_src_getcaps (GstBaseSrc * psrc);
       
    73 
       
    74 static GstFlowReturn gst_tcp_client_src_create (GstPushSrc * psrc,
       
    75     GstBuffer ** outbuf);
       
    76 static gboolean gst_tcp_client_src_stop (GstBaseSrc * bsrc);
       
    77 static gboolean gst_tcp_client_src_start (GstBaseSrc * bsrc);
       
    78 static gboolean gst_tcp_client_src_unlock (GstBaseSrc * bsrc);
       
    79 
       
    80 static void gst_tcp_client_src_set_property (GObject * object, guint prop_id,
       
    81     const GValue * value, GParamSpec * pspec);
       
    82 static void gst_tcp_client_src_get_property (GObject * object, guint prop_id,
       
    83     GValue * value, GParamSpec * pspec);
       
    84 
       
    85 
       
    86 static void
       
    87 gst_tcp_client_src_base_init (gpointer g_class)
       
    88 {
       
    89   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
       
    90 
       
    91   gst_element_class_add_pad_template (element_class,
       
    92       gst_static_pad_template_get (&srctemplate));
       
    93 
       
    94   gst_element_class_set_details (element_class, &gst_tcp_client_src_details);
       
    95 }
       
    96 
       
    97 static void
       
    98 gst_tcp_client_src_class_init (GstTCPClientSrcClass * klass)
       
    99 {
       
   100   GObjectClass *gobject_class;
       
   101   GstBaseSrcClass *gstbasesrc_class;
       
   102   GstPushSrcClass *gstpush_src_class;
       
   103 
       
   104   gobject_class = (GObjectClass *) klass;
       
   105   gstbasesrc_class = (GstBaseSrcClass *) klass;
       
   106   gstpush_src_class = (GstPushSrcClass *) klass;
       
   107 
       
   108   gobject_class->set_property = gst_tcp_client_src_set_property;
       
   109   gobject_class->get_property = gst_tcp_client_src_get_property;
       
   110   gobject_class->finalize = gst_tcp_client_src_finalize;
       
   111 
       
   112   g_object_class_install_property (gobject_class, PROP_HOST,
       
   113       g_param_spec_string ("host", "Host",
       
   114           "The host IP address to receive packets from", TCP_DEFAULT_HOST,
       
   115           G_PARAM_READWRITE));
       
   116   g_object_class_install_property (gobject_class, PROP_PORT,
       
   117       g_param_spec_int ("port", "Port", "The port to receive packets from", 0,
       
   118           TCP_HIGHEST_PORT, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
       
   119   g_object_class_install_property (gobject_class, PROP_PROTOCOL,
       
   120       g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
       
   121           GST_TYPE_TCP_PROTOCOL, GST_TCP_PROTOCOL_NONE, G_PARAM_READWRITE));
       
   122 
       
   123   gstbasesrc_class->get_caps = gst_tcp_client_src_getcaps;
       
   124   gstbasesrc_class->start = gst_tcp_client_src_start;
       
   125   gstbasesrc_class->stop = gst_tcp_client_src_stop;
       
   126   gstbasesrc_class->unlock = gst_tcp_client_src_unlock;
       
   127 
       
   128   gstpush_src_class->create = gst_tcp_client_src_create;
       
   129 
       
   130   GST_DEBUG_CATEGORY_INIT (tcpclientsrc_debug, "tcpclientsrc", 0,
       
   131       "TCP Client Source");
       
   132 }
       
   133 
       
   134 static void
       
   135 gst_tcp_client_src_init (GstTCPClientSrc * this, GstTCPClientSrcClass * g_class)
       
   136 {
       
   137   this->port = TCP_DEFAULT_PORT;
       
   138   this->host = g_strdup (TCP_DEFAULT_HOST);
       
   139   this->sock_fd.fd = -1;
       
   140   this->protocol = GST_TCP_PROTOCOL_NONE;
       
   141   this->caps = NULL;
       
   142 
       
   143   gst_base_src_set_live (GST_BASE_SRC (this), TRUE);
       
   144 
       
   145   GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SRC_OPEN);
       
   146 }
       
   147 
       
   148 static void
       
   149 gst_tcp_client_src_finalize (GObject * gobject)
       
   150 {
       
   151   GstTCPClientSrc *this = GST_TCP_CLIENT_SRC (gobject);
       
   152 
       
   153   g_free (this->host);
       
   154 
       
   155   G_OBJECT_CLASS (parent_class)->finalize (gobject);
       
   156 }
       
   157 
       
   158 static GstCaps *
       
   159 gst_tcp_client_src_getcaps (GstBaseSrc * bsrc)
       
   160 {
       
   161   GstTCPClientSrc *src;
       
   162   GstCaps *caps = NULL;
       
   163 
       
   164   src = GST_TCP_CLIENT_SRC (bsrc);
       
   165 
       
   166   if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_CLIENT_SRC_OPEN))
       
   167     caps = gst_caps_new_any ();
       
   168   else if (src->caps)
       
   169     caps = gst_caps_copy (src->caps);
       
   170   else
       
   171     caps = gst_caps_new_any ();
       
   172   GST_DEBUG_OBJECT (src, "returning caps %" GST_PTR_FORMAT, caps);
       
   173   g_assert (GST_IS_CAPS (caps));
       
   174   return caps;
       
   175 }
       
   176 
       
   177 static GstFlowReturn
       
   178 gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
       
   179 {
       
   180   GstTCPClientSrc *src;
       
   181   GstFlowReturn ret = GST_FLOW_OK;
       
   182 
       
   183   src = GST_TCP_CLIENT_SRC (psrc);
       
   184 
       
   185   if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_CLIENT_SRC_OPEN))
       
   186     goto wrong_state;
       
   187 
       
   188   GST_LOG_OBJECT (src, "asked for a buffer");
       
   189 
       
   190   /* read the buffer header if we're using a protocol */
       
   191   switch (src->protocol) {
       
   192     case GST_TCP_PROTOCOL_NONE:
       
   193       ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd.fd,
       
   194           src->fdset, outbuf);
       
   195       break;
       
   196 
       
   197     case GST_TCP_PROTOCOL_GDP:
       
   198       /* get the caps if we're using GDP */
       
   199       if (!src->caps_received) {
       
   200         GstCaps *caps;
       
   201 
       
   202         GST_DEBUG_OBJECT (src, "getting caps through GDP");
       
   203         ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd.fd,
       
   204             src->fdset, &caps);
       
   205 
       
   206         if (ret != GST_FLOW_OK)
       
   207           goto no_caps;
       
   208 
       
   209         src->caps_received = TRUE;
       
   210         src->caps = caps;
       
   211       }
       
   212 
       
   213       ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd.fd,
       
   214           src->fdset, outbuf);
       
   215       break;
       
   216     default:
       
   217       /* need to assert as buf == NULL */
       
   218       g_assert ("Unhandled protocol type");
       
   219       break;
       
   220   }
       
   221 
       
   222   if (ret == GST_FLOW_OK) {
       
   223     GST_LOG_OBJECT (src,
       
   224         "Returning buffer from _get of size %d, ts %"
       
   225         GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
       
   226         ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
       
   227         GST_BUFFER_SIZE (*outbuf),
       
   228         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
       
   229         GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
       
   230         GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
       
   231 
       
   232     gst_buffer_set_caps (*outbuf, src->caps);
       
   233   }
       
   234 
       
   235   return ret;
       
   236 
       
   237 wrong_state:
       
   238   {
       
   239     GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
       
   240     return GST_FLOW_WRONG_STATE;
       
   241   }
       
   242 no_caps:
       
   243   {
       
   244     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
       
   245         ("Could not read caps through GDP"));
       
   246     return ret;
       
   247   }
       
   248 }
       
   249 
       
   250 static void
       
   251 gst_tcp_client_src_set_property (GObject * object, guint prop_id,
       
   252     const GValue * value, GParamSpec * pspec)
       
   253 {
       
   254   GstTCPClientSrc *tcpclientsrc = GST_TCP_CLIENT_SRC (object);
       
   255 
       
   256   switch (prop_id) {
       
   257     case PROP_HOST:
       
   258       if (!g_value_get_string (value)) {
       
   259         g_warning ("host property cannot be NULL");
       
   260         break;
       
   261       }
       
   262       g_free (tcpclientsrc->host);
       
   263       tcpclientsrc->host = g_strdup (g_value_get_string (value));
       
   264       break;
       
   265     case PROP_PORT:
       
   266       tcpclientsrc->port = g_value_get_int (value);
       
   267       break;
       
   268     case PROP_PROTOCOL:
       
   269       tcpclientsrc->protocol = g_value_get_enum (value);
       
   270       break;
       
   271 
       
   272     default:
       
   273       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       
   274       break;
       
   275   }
       
   276 }
       
   277 
       
   278 static void
       
   279 gst_tcp_client_src_get_property (GObject * object, guint prop_id,
       
   280     GValue * value, GParamSpec * pspec)
       
   281 {
       
   282   GstTCPClientSrc *tcpclientsrc = GST_TCP_CLIENT_SRC (object);
       
   283 
       
   284   switch (prop_id) {
       
   285     case PROP_HOST:
       
   286       g_value_set_string (value, tcpclientsrc->host);
       
   287       break;
       
   288     case PROP_PORT:
       
   289       g_value_set_int (value, tcpclientsrc->port);
       
   290       break;
       
   291     case PROP_PROTOCOL:
       
   292       g_value_set_enum (value, tcpclientsrc->protocol);
       
   293       break;
       
   294 
       
   295     default:
       
   296       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       
   297       break;
       
   298   }
       
   299 }
       
   300 
       
   301 /* create a socket for connecting to remote server */
       
   302 static gboolean
       
   303 gst_tcp_client_src_start (GstBaseSrc * bsrc)
       
   304 {
       
   305   int ret;
       
   306   gchar *ip;
       
   307   GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
       
   308 
       
   309   if ((src->fdset = gst_poll_new (TRUE)) == NULL)
       
   310     goto socket_pair;
       
   311 
       
   312   /* create receiving client socket */
       
   313   GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d",
       
   314       src->host, src->port);
       
   315 
       
   316   if ((src->sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
       
   317     goto no_socket;
       
   318 
       
   319   GST_DEBUG_OBJECT (src, "opened receiving client socket with fd %d",
       
   320       src->sock_fd.fd);
       
   321   GST_OBJECT_FLAG_SET (src, GST_TCP_CLIENT_SRC_OPEN);
       
   322 
       
   323   /* look up name if we need to */
       
   324   if (!(ip = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host)))
       
   325     goto name_resolv;
       
   326 
       
   327   GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
       
   328 
       
   329   /* connect to server */
       
   330   memset (&src->server_sin, 0, sizeof (src->server_sin));
       
   331   src->server_sin.sin_family = AF_INET; /* network socket */
       
   332   src->server_sin.sin_port = htons (src->port); /* on port */
       
   333   src->server_sin.sin_addr.s_addr = inet_addr (ip);     /* on host ip */
       
   334   g_free (ip);
       
   335 
       
   336   GST_DEBUG_OBJECT (src, "connecting to server");
       
   337   ret = connect (src->sock_fd.fd, (struct sockaddr *) &src->server_sin,
       
   338       sizeof (src->server_sin));
       
   339 
       
   340   if (ret) {
       
   341     gst_tcp_client_src_stop (GST_BASE_SRC (src));
       
   342     switch (errno) {
       
   343       case ECONNREFUSED:
       
   344         GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ,
       
   345             (_("Connection to %s:%d refused."), src->host, src->port), (NULL));
       
   346         return FALSE;
       
   347         break;
       
   348       default:
       
   349         GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
       
   350             ("connect to %s:%d failed: %s", src->host, src->port,
       
   351                 g_strerror (errno)));
       
   352         return FALSE;
       
   353         break;
       
   354     }
       
   355   }
       
   356 
       
   357   return TRUE;
       
   358 
       
   359 socket_pair:
       
   360   {
       
   361     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
       
   362         GST_ERROR_SYSTEM);
       
   363     return FALSE;
       
   364   }
       
   365 no_socket:
       
   366   {
       
   367     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
       
   368     return FALSE;
       
   369   }
       
   370 name_resolv:
       
   371   {
       
   372     gst_tcp_client_src_stop (GST_BASE_SRC (src));
       
   373     return FALSE;
       
   374   }
       
   375 }
       
   376 
       
   377 /* close the socket and associated resources
       
   378  * unset OPEN flag
       
   379  * used both to recover from errors and go to NULL state */
       
   380 static gboolean
       
   381 gst_tcp_client_src_stop (GstBaseSrc * bsrc)
       
   382 {
       
   383   GstTCPClientSrc *src;
       
   384 
       
   385   src = GST_TCP_CLIENT_SRC (bsrc);
       
   386 
       
   387   GST_DEBUG_OBJECT (src, "closing socket");
       
   388 
       
   389   if (src->fdset != NULL) {
       
   390     gst_poll_free (src->fdset);
       
   391     src->fdset = NULL;
       
   392   }
       
   393 
       
   394   gst_tcp_socket_close (&src->sock_fd);
       
   395   src->caps_received = FALSE;
       
   396   if (src->caps) {
       
   397     gst_caps_unref (src->caps);
       
   398     src->caps = NULL;
       
   399   }
       
   400   GST_OBJECT_FLAG_UNSET (src, GST_TCP_CLIENT_SRC_OPEN);
       
   401 
       
   402   return TRUE;
       
   403 }
       
   404 
       
   405 /* will be called only between calls to start() and stop() */
       
   406 static gboolean
       
   407 gst_tcp_client_src_unlock (GstBaseSrc * bsrc)
       
   408 {
       
   409   GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
       
   410 
       
   411   gst_poll_set_flushing (src->fdset, TRUE);
       
   412 
       
   413   return TRUE;
       
   414 }