gst_plugins_base/gst/tcp/gstmultifdsink.c
changeset 16 8e837d1bf446
parent 0 0e761a78d257
child 30 7e817e7e631c
--- a/gst_plugins_base/gst/tcp/gstmultifdsink.c	Wed Mar 24 17:58:42 2010 -0500
+++ b/gst_plugins_base/gst/tcp/gstmultifdsink.c	Wed Mar 24 18:04:17 2010 -0500
@@ -21,43 +21,36 @@
 
 /**
  * SECTION:element-multifdsink
- * @short_description: Send data to multiple file descriptors
  * @see_also: tcpserversink
  *
- * <refsect2>
- * <para>
  * This plugin writes incoming data to a set of file descriptors. The
- * file descriptors can be added to multifdsink by emitting the "add" signal. 
- * For each descriptor added, the "client-added" signal will be called.
- * </para>
- * <para>
- * As of version 0.10.8, a client can also be added with the "add-full" signal
+ * file descriptors can be added to multifdsink by emitting the #GstMultiFdSink::add signal. 
+ * For each descriptor added, the #GstMultiFdSink::client-added signal will be called.
+ *
+ * As of version 0.10.8, a client can also be added with the #GstMultiFdSink::add-full signal
  * that allows for more control over what and how much data a client 
  * initially receives.
- * </para>
- * <para>
- * Clients can be removed from multifdsink by emitting the "remove" signal. For
- * each descriptor removed, the "client-removed" signal will be called. The
- * "client-removed" signal can also be fired when multifdsink decides that a
+ *
+ * Clients can be removed from multifdsink by emitting the #GstMultiFdSink::remove signal. For
+ * each descriptor removed, the #GstMultiFdSink::client-removed signal will be called. The
+ * #GstMultiFdSink::client-removed signal can also be fired when multifdsink decides that a
  * client is not active anymore or, depending on the value of the
- * "recover-policy" property, if the client is reading too slowly.
+ * #GstMultiFdSink:recover-policy property, if the client is reading too slowly.
  * In all cases, multifdsink will never close a file descriptor itself.
  * The user of multifdsink is responsible for closing all file descriptors.
- * This can for example be done in response to the "client-fd-removed" signal.
+ * This can for example be done in response to the #GstMultiFdSink::client-fd-removed signal.
  * Note that multifdsink still has a reference to the file descriptor when the
- * "client-removed" signal is emitted, so that "get-stats" can be performed on
+ * #GstMultiFdSink::client-removed signal is emitted, so that "get-stats" can be performed on
  * the descriptor; it is therefore not safe to close the file descriptor in
- * the "client-removed" signal handler, and you should use the 
- * "client-fd-removed" signal to safely close the fd.
- * </para>
- * <para>
+ * the #GstMultiFdSink::client-removed signal handler, and you should use the 
+ * #GstMultiFdSink::client-fd-removed signal to safely close the fd.
+ *
  * Multifdsink internally keeps a queue of the incoming buffers and uses a
  * separate thread to send the buffers to the clients. This ensures that no
  * client write can block the pipeline and that clients can read with different
  * speeds.
- * </para>
- * <para>
- * When adding a client to multifdsink, the "sync-method" property will define
+ *
+ * When adding a client to multifdsink, the #GstMultiFdSink:sync-method property will define
  * which buffer in the queued buffers will be sent first to the client. Clients 
  * can be sent the most recent buffer (which might not be decodable by the 
  * client if it is not a keyframe), the next keyframe received in 
@@ -65,48 +58,42 @@
  * last received keyframe (which will cause a simple burst-on-connect). 
  * Multifdsink will always keep at least one keyframe in its internal buffers
  * when the sync-mode is set to latest-keyframe.
- * </para>
- * <para>
- * As of version 0.10.8, there are additional values for the sync-method 
+ *
+ * As of version 0.10.8, there are additional values for the #GstMultiFdSink:sync-method 
  * property to allow finer control over burst-on-connect behaviour. By selecting
  * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe'
  * additionally requires that the burst begin with a keyframe, and 
  * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will
  * prefer a minimum burst size even if it requires not starting with a keyframe.
- * </para>
- * <para>
+ *
  * Multifdsink can be instructed to keep at least a minimum amount of data
  * expressed in time or byte units in its internal queues with the the 
- * "time-min" and "bytes-min" properties respectively. These properties are
- * useful if the application adds clients with the "add-full" signal to
- * make sure that a burst connect can actually be honored. 
- * </para>
- * <para>
+ * #GstMultiFdSink:time-min and #GstMultiFdSink:bytes-min properties respectively.
+ * These properties are useful if the application adds clients with the 
+ * #GstMultiFdSink::add-full signal to make sure that a burst connect can
+ * actually be honored. 
+ *
  * When streaming data, clients are allowed to read at a different rate than
  * the rate at which multifdsink receives data. If the client is reading too
  * fast, no data will be send to the client until multifdsink receives more
  * data. If the client, however, reads too slowly, data for that client will be 
  * queued up in multifdsink. Two properties control the amount of data 
- * (buffers) that is queued in multifdsink: "buffers-max" and 
- * "buffers-soft-max". A client that falls behind by "buffers-max" is removed 
- * from multifdsink forcibly.
- * </para>
- * <para>
- * A client with a lag of at least "buffers-soft-max" enters the recovery
- * procedure which is controlled with the "recover-policy" property. A recover
- * policy of NONE will do nothing, RESYNC_LATEST will send the most recently
+ * (buffers) that is queued in multifdsink: #GstMultiFdSink:buffers-max and 
+ * #GstMultiFdSink:buffers-soft-max. A client that falls behind by
+ * #GstMultiFdSink:buffers-max is removed from multifdsink forcibly.
+ *
+ * A client with a lag of at least #GstMultiFdSink:buffers-soft-max enters the recovery
+ * procedure which is controlled with the #GstMultiFdSink:recover-policy property.
+ * A recover policy of NONE will do nothing, RESYNC_LATEST will send the most recently
  * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT
  * positions the client to the soft limit in the buffer queue and
  * RESYNC_KEYFRAME positions the client at the most recent keyframe in the
  * buffer queue.
- * </para>
- * <para>
+ *
  * multifdsink will by default synchronize on the clock before serving the 
  * buffers to the clients. This behaviour can be disabled by setting the sync 
  * property to FALSE. Multifdsink will by default not do QoS and will never
  * drop late buffers.
- * </para>
- * </refsect2>
  *
  * Last reviewed on 2006-09-12 (0.10.10)
  */
@@ -117,11 +104,18 @@
 #include <gst/gst-i18n-plugin.h>
 
 #include <sys/ioctl.h>
+
+#ifdef HAVE_UNISTD_H
 #include <unistd.h>
+#endif
+
 #include <fcntl.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <sys/stat.h>
+#include <netinet/in.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
 
 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
 #include <sys/filio.h>
@@ -176,16 +170,21 @@
 #define DEFAULT_TIME_MIN                -1
 #define DEFAULT_BYTES_MIN               -1
 #define DEFAULT_BUFFERS_MIN             -1
-#define DEFAULT_UNIT_TYPE               GST_UNIT_TYPE_BUFFERS
+#define DEFAULT_UNIT_TYPE               GST_TCP_UNIT_TYPE_BUFFERS
 #define DEFAULT_UNITS_MAX               -1
 #define DEFAULT_UNITS_SOFT_MAX          -1
 #define DEFAULT_RECOVER_POLICY          GST_RECOVER_POLICY_NONE
 #define DEFAULT_TIMEOUT                 0
 #define DEFAULT_SYNC_METHOD             GST_SYNC_METHOD_LATEST
 
-#define DEFAULT_BURST_UNIT              GST_UNIT_TYPE_UNDEFINED
+#define DEFAULT_BURST_UNIT              GST_TCP_UNIT_TYPE_UNDEFINED
 #define DEFAULT_BURST_VALUE             0
 
+#define DEFAULT_QOS_DSCP                -1
+#define DEFAULT_HANDLE_READ             TRUE
+
+#define DEFAULT_RESEND_STREAMHEADER      TRUE
+
 enum
 {
   PROP_0,
@@ -214,6 +213,16 @@
 
   PROP_BURST_UNIT,
   PROP_BURST_VALUE,
+
+  PROP_QOS_DSCP,
+
+  PROP_HANDLE_READ,
+
+  PROP_RESEND_STREAMHEADER,
+
+  PROP_NUM_FDS,
+
+  PROP_LAST
 };
 
 /* For backward compat, we can't really select the poll mode anymore with
@@ -229,6 +238,7 @@
     {2, "EPoll", "epoll"},
     {0, NULL, NULL},
   };
+
   if (!fdset_mode_type) {
     fdset_mode_type = g_enum_register_static ("GstFDSetMode", fdset_mode);
   }
@@ -294,10 +304,10 @@
 {
   static GType unit_type_type = 0;
   static const GEnumValue unit_type[] = {
-    {GST_UNIT_TYPE_UNDEFINED, "Undefined", "undefined"},
-    {GST_UNIT_TYPE_BUFFERS, "Buffers", "buffers"},
-    {GST_UNIT_TYPE_BYTES, "Bytes", "bytes"},
-    {GST_UNIT_TYPE_TIME, "Time", "time"},
+    {GST_TCP_UNIT_TYPE_UNDEFINED, "Undefined", "undefined"},
+    {GST_TCP_UNIT_TYPE_BUFFERS, "Buffers", "buffers"},
+    {GST_TCP_UNIT_TYPE_BYTES, "Bytes", "bytes"},
+    {GST_TCP_UNIT_TYPE_TIME, "Time", "time"},
     {0, NULL, NULL},
   };
 
@@ -378,7 +388,8 @@
 
   g_object_class_install_property (gobject_class, PROP_PROTOCOL,
       g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
-          GST_TYPE_TCP_PROTOCOL, DEFAULT_PROTOCOL, G_PARAM_READWRITE));
+          GST_TYPE_TCP_PROTOCOL, DEFAULT_PROTOCOL,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   /**
    * GstMultiFdSink::mode
@@ -391,88 +402,132 @@
   g_object_class_install_property (gobject_class, PROP_MODE,
       g_param_spec_enum ("mode", "Mode",
           "The mode for selecting activity on the fds (deprecated)",
-          GST_TYPE_FDSET_MODE, DEFAULT_MODE, G_PARAM_READWRITE));
+          GST_TYPE_FDSET_MODE, DEFAULT_MODE,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
       g_param_spec_int ("buffers-max", "Buffers max",
           "max number of buffers to queue for a client (-1 = no limit)", -1,
-          G_MAXINT, DEFAULT_BUFFERS_MAX, G_PARAM_READWRITE));
-  g_object_class_install_property (gobject_class,
-      PROP_BUFFERS_SOFT_MAX, g_param_spec_int ("buffers-soft-max",
-          "Buffers soft max",
+          G_MAXINT, DEFAULT_BUFFERS_MAX,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX,
+      g_param_spec_int ("buffers-soft-max", "Buffers soft max",
           "Recover client when going over this limit (-1 = no limit)", -1,
-          G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE));
+          G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_BYTES_MIN,
       g_param_spec_int ("bytes-min", "Bytes min",
           "min number of bytes to queue (-1 = as little as possible)", -1,
-          G_MAXINT, DEFAULT_BYTES_MIN, G_PARAM_READWRITE));
+          G_MAXINT, DEFAULT_BYTES_MIN,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_TIME_MIN,
       g_param_spec_int64 ("time-min", "Time min",
           "min number of time to queue (-1 = as little as possible)", -1,
-          G_MAXINT64, DEFAULT_TIME_MIN, G_PARAM_READWRITE));
+          G_MAXINT64, DEFAULT_TIME_MIN,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_BUFFERS_MIN,
       g_param_spec_int ("buffers-min", "Buffers min",
           "min number of buffers to queue (-1 = as few as possible)", -1,
-          G_MAXINT, DEFAULT_BUFFERS_MIN, G_PARAM_READWRITE));
+          G_MAXINT, DEFAULT_BUFFERS_MIN,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_UNIT_TYPE,
       g_param_spec_enum ("unit-type", "Units type",
           "The unit to measure the max/soft-max/queued properties",
-          GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE, G_PARAM_READWRITE));
+          GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
       g_param_spec_int64 ("units-max", "Units max",
           "max number of units to queue (-1 = no limit)", -1, G_MAXINT64,
-          DEFAULT_UNITS_MAX, G_PARAM_READWRITE));
+          DEFAULT_UNITS_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX,
       g_param_spec_int64 ("units-soft-max", "Units soft max",
           "Recover client when going over this limit (-1 = no limit)", -1,
-          G_MAXINT64, DEFAULT_UNITS_SOFT_MAX, G_PARAM_READWRITE));
+          G_MAXINT64, DEFAULT_UNITS_SOFT_MAX,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_BUFFERS_QUEUED,
       g_param_spec_uint ("buffers-queued", "Buffers queued",
           "Number of buffers currently queued", 0, G_MAXUINT, 0,
-          G_PARAM_READABLE));
+          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 #if NOT_IMPLEMENTED
   g_object_class_install_property (gobject_class, PROP_BYTES_QUEUED,
       g_param_spec_uint ("bytes-queued", "Bytes queued",
           "Number of bytes currently queued", 0, G_MAXUINT, 0,
-          G_PARAM_READABLE));
+          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_TIME_QUEUED,
       g_param_spec_uint64 ("time-queued", "Time queued",
           "Number of time currently queued", 0, G_MAXUINT64, 0,
-          G_PARAM_READABLE));
+          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 #endif
 
   g_object_class_install_property (gobject_class, PROP_RECOVER_POLICY,
       g_param_spec_enum ("recover-policy", "Recover Policy",
           "How to recover when client reaches the soft max",
-          GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY, G_PARAM_READWRITE));
+          GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
       g_param_spec_uint64 ("timeout", "Timeout",
           "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)",
-          0, G_MAXUINT64, DEFAULT_TIMEOUT, G_PARAM_READWRITE));
+          0, G_MAXUINT64, DEFAULT_TIMEOUT,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_SYNC_METHOD,
       g_param_spec_enum ("sync-method", "Sync Method",
-          "How to sync new clients to the stream",
-          GST_TYPE_SYNC_METHOD, DEFAULT_SYNC_METHOD, G_PARAM_READWRITE));
+          "How to sync new clients to the stream", GST_TYPE_SYNC_METHOD,
+          DEFAULT_SYNC_METHOD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_BYTES_TO_SERVE,
       g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
           "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
-          G_PARAM_READABLE));
+          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_BYTES_SERVED,
       g_param_spec_uint64 ("bytes-served", "Bytes served",
           "Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
-          G_PARAM_READABLE));
+          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_BURST_UNIT,
       g_param_spec_enum ("burst-unit", "Burst unit",
           "The format of the burst units (when sync-method is burst[[-with]-keyframe])",
-          GST_TYPE_UNIT_TYPE, DEFAULT_BURST_UNIT, G_PARAM_READWRITE));
+          GST_TYPE_UNIT_TYPE, DEFAULT_BURST_UNIT,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_BURST_VALUE,
       g_param_spec_uint64 ("burst-value", "Burst value",
-          "The amount of burst expressed in burst-unit",
-          0, G_MAXUINT64, DEFAULT_BURST_VALUE, G_PARAM_READWRITE));
+          "The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
+          DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
+      g_param_spec_int ("qos-dscp", "QoS diff srv code point",
+          "Quality of Service, differentiated services code point (-1 default)",
+          -1, 63, DEFAULT_QOS_DSCP,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  /**
+   * GstMultiFdSink::handle-read
+   *
+   * Handle read requests from clients and discard the data.
+   *
+   * Since: 0.10.23
+   */
+  g_object_class_install_property (gobject_class, PROP_HANDLE_READ,
+      g_param_spec_boolean ("handle-read", "Handle Read",
+          "Handle client reads and discard the data",
+          DEFAULT_HANDLE_READ, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  /**
+   * GstMultiFdSink::resend-streamheader
+   *
+   * Resend the streamheaders to existing clients when they change.
+   *
+   * Since: 0.10.23
+   */
+  g_object_class_install_property (gobject_class, PROP_RESEND_STREAMHEADER,
+      g_param_spec_boolean ("resend-streamheader", "Resend streamheader",
+          "Resend the streamheader if it changes in the caps",
+          DEFAULT_RESEND_STREAMHEADER,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class, PROP_NUM_FDS,
+      g_param_spec_uint ("num-fds", "Number of fds",
+          "The current number of client file descriptors.",
+          0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 
   /**
    * GstMultiFdSink::add:
@@ -490,7 +545,7 @@
    * GstMultiFdSink::add-full:
    * @gstmultifdsink: the multifdsink element to emit this signal on
    * @fd:             the file descriptor to add to multifdsink
-   * @keyframe:       start bursting from a keyframe
+   * @sync:           the sync method to use
    * @unit_type_min:  the unit-type of @value_min
    * @value_min:      the minimum amount of data to burst expressed in
    *                  @unit_type_min units.
@@ -505,8 +560,8 @@
       g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass),
       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
           add_full), NULL, NULL,
-      gst_tcp_marshal_VOID__INT_BOOLEAN_INT_UINT64_INT_UINT64, G_TYPE_NONE, 6,
-      G_TYPE_INT, G_TYPE_BOOLEAN, GST_TYPE_UNIT_TYPE, G_TYPE_UINT64,
+      gst_tcp_marshal_VOID__INT_ENUM_INT_UINT64_INT_UINT64, G_TYPE_NONE, 6,
+      G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_TYPE, G_TYPE_UINT64,
       GST_TYPE_UNIT_TYPE, G_TYPE_UINT64);
   /**
    * GstMultiFdSink::remove:
@@ -521,7 +576,7 @@
           remove), NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1,
       G_TYPE_INT);
   /**
-   * GstMultiFdSink::remove_flush:
+   * GstMultiFdSink::remove-flush:
    * @gstmultifdsink: the multifdsink element to emit this signal on
    * @fd:             the file descriptor to remove from multifdsink
    *
@@ -662,6 +717,11 @@
   this->def_burst_unit = DEFAULT_BURST_UNIT;
   this->def_burst_value = DEFAULT_BURST_VALUE;
 
+  this->qos_dscp = DEFAULT_QOS_DSCP;
+  this->handle_read = DEFAULT_HANDLE_READ;
+
+  this->resend_streamheader = DEFAULT_RESEND_STREAMHEADER;
+
   this->header_flags = 0;
 }
 
@@ -679,6 +739,84 @@
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
+static gint
+setup_dscp_client (GstMultiFdSink * sink, GstTCPClient * client)
+{
+  gint tos;
+  gint ret;
+  union gst_sockaddr
+  {
+    struct sockaddr sa;
+    struct sockaddr_in6 sa_in6;
+    struct sockaddr_storage sa_stor;
+  } sa;
+  socklen_t slen = sizeof (sa);
+  gint af;
+
+  /* don't touch */
+  if (sink->qos_dscp < 0)
+    return 0;
+
+  if ((ret = getsockname (client->fd.fd, &sa.sa, &slen)) < 0) {
+    GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
+    return ret;
+  }
+
+  af = sa.sa.sa_family;
+
+  /* if this is an IPv4-mapped address then do IPv4 QoS */
+  if (af == AF_INET6) {
+
+    GST_DEBUG_OBJECT (sink, "check IP6 socket");
+    if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
+      GST_DEBUG_OBJECT (sink, "mapped to IPV4");
+      af = AF_INET;
+    }
+  }
+
+  /* extract and shift 6 bits of the DSCP */
+  tos = (sink->qos_dscp & 0x3f) << 2;
+
+  switch (af) {
+    case AF_INET:
+      ret = setsockopt (client->fd.fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos));
+      break;
+    case AF_INET6:
+#ifdef IPV6_TCLASS
+      ret =
+          setsockopt (client->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, &tos,
+          sizeof (tos));
+      break;
+#endif
+    default:
+      ret = 0;
+      GST_ERROR_OBJECT (sink, "unsupported AF");
+      break;
+  }
+  if (ret)
+    GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));
+
+  return ret;
+}
+
+
+static void
+setup_dscp (GstMultiFdSink * sink)
+{
+  GList *clients, *next;
+
+  CLIENTS_LOCK (sink);
+  for (clients = sink->clients; clients; clients = next) {
+    GstTCPClient *client;
+
+    client = (GstTCPClient *) clients->data;
+    next = g_list_next (clients);
+
+    setup_dscp_client (sink, client);
+  }
+  CLIENTS_UNLOCK (sink);
+}
+
 /* "add-full" signal implementation */
 #ifdef __SYMBIAN32__
 EXPORT_C
@@ -686,8 +824,8 @@
 
 void
 gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
-    GstSyncMethod sync_method, GstUnitType min_unit, guint64 min_value,
-    GstUnitType max_unit, guint64 max_value)
+    GstSyncMethod sync_method, GstTCPUnitType min_unit, guint64 min_value,
+    GstTCPUnitType max_unit, guint64 max_value)
 {
   GstTCPClient *client;
   GList *clink;
@@ -745,21 +883,23 @@
   sink->clients_cookie++;
 
   /* set the socket to non blocking */
-//temporary commented for multifdsink issue  
-  //res = fcntl (fd, F_SETFL, O_NONBLOCK);  //Arun
-  
+//  temporary  fix for multifdsink
+  //res = fcntl (fd, F_SETFL, O_NONBLOCK);
   /* we always read from a client */
   gst_poll_add_fd (sink->fdset, &client->fd);
 
   /* we don't try to read from write only fds */
-  flags = fcntl (fd, F_GETFL, 0);
-  if ((flags & O_ACCMODE) != O_WRONLY) {
-    gst_poll_fd_ctl_read (sink->fdset, &client->fd, TRUE);
+  if (sink->handle_read) {
+    flags = fcntl (fd, F_GETFL, 0);
+    if ((flags & O_ACCMODE) != O_WRONLY) {
+      gst_poll_fd_ctl_read (sink->fdset, &client->fd, TRUE);
+    }
   }
   /* figure out the mode, can't use send() for non sockets */
   res = fstat (fd, &statbuf);
   if (S_ISSOCK (statbuf.st_mode)) {
     client->is_socket = TRUE;
+    setup_dscp_client (sink, client);
   }
 
   gst_poll_restart (sink->fdset);
@@ -1279,14 +1419,21 @@
           send_streamheader = TRUE;
         } else {
           /* both old and new caps have streamheader set */
-          sh1 = gst_structure_get_value (s, "streamheader");
-          s = gst_caps_get_structure (caps, 0);
-          sh2 = gst_structure_get_value (s, "streamheader");
-          if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) {
+          if (!sink->resend_streamheader) {
             GST_DEBUG_OBJECT (sink,
-                "[fd %5d] new streamheader different from old, sending",
+                "[fd %5d] asked to not resend the streamheader, not sending",
                 client->fd.fd);
-            send_streamheader = TRUE;
+            send_streamheader = FALSE;
+          } else {
+            sh1 = gst_structure_get_value (s, "streamheader");
+            s = gst_caps_get_structure (caps, 0);
+            sh2 = gst_structure_get_value (s, "streamheader");
+            if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) {
+              GST_DEBUG_OBJECT (sink,
+                  "[fd %5d] new streamheader different from old, sending",
+                  client->fd.fd);
+              send_streamheader = TRUE;
+            }
           }
         }
       }
@@ -1414,9 +1561,9 @@
 get_buffers_max (GstMultiFdSink * sink, gint64 max)
 {
   switch (sink->unit_type) {
-    case GST_UNIT_TYPE_BUFFERS:
+    case GST_TCP_UNIT_TYPE_BUFFERS:
       return max;
-    case GST_UNIT_TYPE_TIME:
+    case GST_TCP_UNIT_TYPE_TIME:
     {
       GstBuffer *buf;
       int i;
@@ -1440,7 +1587,7 @@
       }
       return len + 1;
     }
-    case GST_UNIT_TYPE_BYTES:
+    case GST_TCP_UNIT_TYPE_BYTES:
     {
       GstBuffer *buf;
       int i;
@@ -1583,23 +1730,23 @@
  * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise.
  */
 static gboolean
-assign_value (GstUnitType unit, guint64 value, gint * bytes, gint * buffers,
+assign_value (GstTCPUnitType unit, guint64 value, gint * bytes, gint * buffers,
     GstClockTime * time)
 {
   gboolean res = TRUE;
 
   /* set only the limit of the given format to the given value */
   switch (unit) {
-    case GST_UNIT_TYPE_BUFFERS:
+    case GST_TCP_UNIT_TYPE_BUFFERS:
       *buffers = (gint) value;
       break;
-    case GST_UNIT_TYPE_TIME:
+    case GST_TCP_UNIT_TYPE_TIME:
       *time = value;
       break;
-    case GST_UNIT_TYPE_BYTES:
+    case GST_TCP_UNIT_TYPE_BYTES:
       *bytes = (gint) value;
       break;
-    case GST_UNIT_TYPE_UNDEFINED:
+    case GST_TCP_UNIT_TYPE_UNDEFINED:
     default:
       res = FALSE;
       break;
@@ -1616,8 +1763,9 @@
  * function returns FALSE.
  */
 static gboolean
-count_burst_unit (GstMultiFdSink * sink, gint * min_idx, GstUnitType min_unit,
-    guint64 min_value, gint * max_idx, GstUnitType max_unit, guint64 max_value)
+count_burst_unit (GstMultiFdSink * sink, gint * min_idx,
+    GstTCPUnitType min_unit, guint64 min_value, gint * max_idx,
+    GstTCPUnitType max_unit, guint64 max_value)
 {
   gint bytes_min = -1, buffers_min = -1;
   gint bytes_max = -1, buffers_max = -1;
@@ -2238,11 +2386,12 @@
     GstBuffer *buf;
 
     /* no point in searching beyond the soft-max if any. */
-    if (soft_max_buffers) {
+    if (soft_max_buffers > 0) {
       limit = MIN (limit, soft_max_buffers);
     }
-    GST_LOG_OBJECT (sink, "extending queue to include sync point, now at %d",
-        max_buffer_usage);
+    GST_LOG_OBJECT (sink,
+        "extending queue to include sync point, now at %d, limit is %d",
+        max_buffer_usage, limit);
     for (i = 0; i < limit; i++) {
       buf = g_array_index (sink->bufqueue, GstBuffer *, i);
       if (is_sync_frame (sink, buf)) {
@@ -2342,6 +2491,7 @@
                 fd, g_strerror (errno), errno);
             if (errno == EBADF) {
               client->status = GST_CLIENT_STATUS_ERROR;
+              /* releases the CLIENTS lock */
               gst_multi_fd_sink_remove_client_link (sink, clients);
             }
           }
@@ -2587,6 +2737,16 @@
     case PROP_BURST_VALUE:
       multifdsink->def_burst_value = g_value_get_uint64 (value);
       break;
+    case PROP_QOS_DSCP:
+      multifdsink->qos_dscp = g_value_get_int (value);
+      setup_dscp (multifdsink);
+      break;
+    case PROP_HANDLE_READ:
+      multifdsink->handle_read = g_value_get_boolean (value);
+      break;
+    case PROP_RESEND_STREAMHEADER:
+      multifdsink->resend_streamheader = g_value_get_boolean (value);
+      break;
 
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -2663,6 +2823,18 @@
     case PROP_BURST_VALUE:
       g_value_set_uint64 (value, multifdsink->def_burst_value);
       break;
+    case PROP_QOS_DSCP:
+      g_value_set_int (value, multifdsink->qos_dscp);
+      break;
+    case PROP_HANDLE_READ:
+      g_value_set_boolean (value, multifdsink->handle_read);
+      break;
+    case PROP_RESEND_STREAMHEADER:
+      g_value_set_boolean (value, multifdsink->resend_streamheader);
+      break;
+    case PROP_NUM_FDS:
+      g_value_set_uint (value, g_hash_table_size (multifdsink->fd_hash));
+      break;
 
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);