gst_plugins_base/gst/tcp/gstmultifdsink.c
branchRCL_3
changeset 30 7e817e7e631c
parent 29 567bb019e3e3
equal deleted inserted replaced
29:567bb019e3e3 30:7e817e7e631c
    19  * Boston, MA 02111-1307, USA.
    19  * Boston, MA 02111-1307, USA.
    20  */
    20  */
    21 
    21 
    22 /**
    22 /**
    23  * SECTION:element-multifdsink
    23  * SECTION:element-multifdsink
       
    24  * @short_description: Send data to multiple file descriptors
    24  * @see_also: tcpserversink
    25  * @see_also: tcpserversink
    25  *
    26  *
       
    27  * <refsect2>
       
    28  * <para>
    26  * This plugin writes incoming data to a set of file descriptors. The
    29  * This plugin writes incoming data to a set of file descriptors. The
    27  * file descriptors can be added to multifdsink by emitting the #GstMultiFdSink::add signal. 
    30  * file descriptors can be added to multifdsink by emitting the "add" signal. 
    28  * For each descriptor added, the #GstMultiFdSink::client-added signal will be called.
    31  * For each descriptor added, the "client-added" signal will be called.
    29  *
    32  * </para>
    30  * As of version 0.10.8, a client can also be added with the #GstMultiFdSink::add-full signal
    33  * <para>
       
    34  * As of version 0.10.8, a client can also be added with the "add-full" signal
    31  * that allows for more control over what and how much data a client 
    35  * that allows for more control over what and how much data a client 
    32  * initially receives.
    36  * initially receives.
    33  *
    37  * </para>
    34  * Clients can be removed from multifdsink by emitting the #GstMultiFdSink::remove signal. For
    38  * <para>
    35  * each descriptor removed, the #GstMultiFdSink::client-removed signal will be called. The
    39  * Clients can be removed from multifdsink by emitting the "remove" signal. For
    36  * #GstMultiFdSink::client-removed signal can also be fired when multifdsink decides that a
    40  * each descriptor removed, the "client-removed" signal will be called. The
       
    41  * "client-removed" signal can also be fired when multifdsink decides that a
    37  * client is not active anymore or, depending on the value of the
    42  * client is not active anymore or, depending on the value of the
    38  * #GstMultiFdSink:recover-policy property, if the client is reading too slowly.
    43  * "recover-policy" property, if the client is reading too slowly.
    39  * In all cases, multifdsink will never close a file descriptor itself.
    44  * In all cases, multifdsink will never close a file descriptor itself.
    40  * The user of multifdsink is responsible for closing all file descriptors.
    45  * The user of multifdsink is responsible for closing all file descriptors.
    41  * This can for example be done in response to the #GstMultiFdSink::client-fd-removed signal.
    46  * This can for example be done in response to the "client-fd-removed" signal.
    42  * Note that multifdsink still has a reference to the file descriptor when the
    47  * Note that multifdsink still has a reference to the file descriptor when the
    43  * #GstMultiFdSink::client-removed signal is emitted, so that "get-stats" can be performed on
    48  * "client-removed" signal is emitted, so that "get-stats" can be performed on
    44  * the descriptor; it is therefore not safe to close the file descriptor in
    49  * the descriptor; it is therefore not safe to close the file descriptor in
    45  * the #GstMultiFdSink::client-removed signal handler, and you should use the 
    50  * the "client-removed" signal handler, and you should use the 
    46  * #GstMultiFdSink::client-fd-removed signal to safely close the fd.
    51  * "client-fd-removed" signal to safely close the fd.
    47  *
    52  * </para>
       
    53  * <para>
    48  * Multifdsink internally keeps a queue of the incoming buffers and uses a
    54  * Multifdsink internally keeps a queue of the incoming buffers and uses a
    49  * separate thread to send the buffers to the clients. This ensures that no
    55  * separate thread to send the buffers to the clients. This ensures that no
    50  * client write can block the pipeline and that clients can read with different
    56  * client write can block the pipeline and that clients can read with different
    51  * speeds.
    57  * speeds.
    52  *
    58  * </para>
    53  * When adding a client to multifdsink, the #GstMultiFdSink:sync-method property will define
    59  * <para>
       
    60  * When adding a client to multifdsink, the "sync-method" property will define
    54  * which buffer in the queued buffers will be sent first to the client. Clients 
    61  * which buffer in the queued buffers will be sent first to the client. Clients 
    55  * can be sent the most recent buffer (which might not be decodable by the 
    62  * can be sent the most recent buffer (which might not be decodable by the 
    56  * client if it is not a keyframe), the next keyframe received in 
    63  * client if it is not a keyframe), the next keyframe received in 
    57  * multifdsink (which can take some time depending on the keyframe rate), or the
    64  * multifdsink (which can take some time depending on the keyframe rate), or the
    58  * last received keyframe (which will cause a simple burst-on-connect). 
    65  * last received keyframe (which will cause a simple burst-on-connect). 
    59  * Multifdsink will always keep at least one keyframe in its internal buffers
    66  * Multifdsink will always keep at least one keyframe in its internal buffers
    60  * when the sync-mode is set to latest-keyframe.
    67  * when the sync-mode is set to latest-keyframe.
    61  *
    68  * </para>
    62  * As of version 0.10.8, there are additional values for the #GstMultiFdSink:sync-method 
    69  * <para>
       
    70  * As of version 0.10.8, there are additional values for the sync-method 
    63  * property to allow finer control over burst-on-connect behaviour. By selecting
    71  * property to allow finer control over burst-on-connect behaviour. By selecting
    64  * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe'
    72  * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe'
    65  * additionally requires that the burst begin with a keyframe, and 
    73  * additionally requires that the burst begin with a keyframe, and 
    66  * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will
    74  * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will
    67  * prefer a minimum burst size even if it requires not starting with a keyframe.
    75  * prefer a minimum burst size even if it requires not starting with a keyframe.
    68  *
    76  * </para>
       
    77  * <para>
    69  * Multifdsink can be instructed to keep at least a minimum amount of data
    78  * Multifdsink can be instructed to keep at least a minimum amount of data
    70  * expressed in time or byte units in its internal queues with the the 
    79  * expressed in time or byte units in its internal queues with the the 
    71  * #GstMultiFdSink:time-min and #GstMultiFdSink:bytes-min properties respectively.
    80  * "time-min" and "bytes-min" properties respectively. These properties are
    72  * These properties are useful if the application adds clients with the 
    81  * useful if the application adds clients with the "add-full" signal to
    73  * #GstMultiFdSink::add-full signal to make sure that a burst connect can
    82  * make sure that a burst connect can actually be honored. 
    74  * actually be honored. 
    83  * </para>
    75  *
    84  * <para>
    76  * When streaming data, clients are allowed to read at a different rate than
    85  * When streaming data, clients are allowed to read at a different rate than
    77  * the rate at which multifdsink receives data. If the client is reading too
    86  * the rate at which multifdsink receives data. If the client is reading too
    78  * fast, no data will be send to the client until multifdsink receives more
    87  * fast, no data will be send to the client until multifdsink receives more
    79  * data. If the client, however, reads too slowly, data for that client will be 
    88  * data. If the client, however, reads too slowly, data for that client will be 
    80  * queued up in multifdsink. Two properties control the amount of data 
    89  * queued up in multifdsink. Two properties control the amount of data 
    81  * (buffers) that is queued in multifdsink: #GstMultiFdSink:buffers-max and 
    90  * (buffers) that is queued in multifdsink: "buffers-max" and 
    82  * #GstMultiFdSink:buffers-soft-max. A client that falls behind by
    91  * "buffers-soft-max". A client that falls behind by "buffers-max" is removed 
    83  * #GstMultiFdSink:buffers-max is removed from multifdsink forcibly.
    92  * from multifdsink forcibly.
    84  *
    93  * </para>
    85  * A client with a lag of at least #GstMultiFdSink:buffers-soft-max enters the recovery
    94  * <para>
    86  * procedure which is controlled with the #GstMultiFdSink:recover-policy property.
    95  * A client with a lag of at least "buffers-soft-max" enters the recovery
    87  * A recover policy of NONE will do nothing, RESYNC_LATEST will send the most recently
    96  * procedure which is controlled with the "recover-policy" property. A recover
       
    97  * policy of NONE will do nothing, RESYNC_LATEST will send the most recently
    88  * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT
    98  * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT
    89  * positions the client to the soft limit in the buffer queue and
    99  * positions the client to the soft limit in the buffer queue and
    90  * RESYNC_KEYFRAME positions the client at the most recent keyframe in the
   100  * RESYNC_KEYFRAME positions the client at the most recent keyframe in the
    91  * buffer queue.
   101  * buffer queue.
    92  *
   102  * </para>
       
   103  * <para>
    93  * multifdsink will by default synchronize on the clock before serving the 
   104  * multifdsink will by default synchronize on the clock before serving the 
    94  * buffers to the clients. This behaviour can be disabled by setting the sync 
   105  * buffers to the clients. This behaviour can be disabled by setting the sync 
    95  * property to FALSE. Multifdsink will by default not do QoS and will never
   106  * property to FALSE. Multifdsink will by default not do QoS and will never
    96  * drop late buffers.
   107  * drop late buffers.
       
   108  * </para>
       
   109  * </refsect2>
    97  *
   110  *
    98  * Last reviewed on 2006-09-12 (0.10.10)
   111  * Last reviewed on 2006-09-12 (0.10.10)
    99  */
   112  */
   100 
   113 
   101 #ifdef HAVE_CONFIG_H
   114 #ifdef HAVE_CONFIG_H
   102 #include "config.h"
   115 #include "config.h"
   103 #endif
   116 #endif
   104 #include <gst/gst-i18n-plugin.h>
   117 #include <gst/gst-i18n-plugin.h>
   105 
   118 
   106 #include <sys/ioctl.h>
   119 #include <sys/ioctl.h>
   107 
       
   108 #ifdef HAVE_UNISTD_H
       
   109 #include <unistd.h>
   120 #include <unistd.h>
   110 #endif
       
   111 
       
   112 #include <fcntl.h>
   121 #include <fcntl.h>
   113 #include <sys/types.h>
   122 #include <sys/types.h>
   114 #include <sys/socket.h>
   123 #include <sys/socket.h>
   115 #include <sys/stat.h>
   124 #include <sys/stat.h>
   116 #include <netinet/in.h>
       
   117 #include <netinet/in.h>
       
   118 #include <arpa/inet.h>
       
   119 
   125 
   120 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
   126 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
   121 #include <sys/filio.h>
   127 #include <sys/filio.h>
   122 #endif
   128 #endif
   123 
   129 
   168 #define DEFAULT_BUFFERS_MAX             -1
   174 #define DEFAULT_BUFFERS_MAX             -1
   169 #define DEFAULT_BUFFERS_SOFT_MAX        -1
   175 #define DEFAULT_BUFFERS_SOFT_MAX        -1
   170 #define DEFAULT_TIME_MIN                -1
   176 #define DEFAULT_TIME_MIN                -1
   171 #define DEFAULT_BYTES_MIN               -1
   177 #define DEFAULT_BYTES_MIN               -1
   172 #define DEFAULT_BUFFERS_MIN             -1
   178 #define DEFAULT_BUFFERS_MIN             -1
   173 #define DEFAULT_UNIT_TYPE               GST_TCP_UNIT_TYPE_BUFFERS
   179 #define DEFAULT_UNIT_TYPE               GST_UNIT_TYPE_BUFFERS
   174 #define DEFAULT_UNITS_MAX               -1
   180 #define DEFAULT_UNITS_MAX               -1
   175 #define DEFAULT_UNITS_SOFT_MAX          -1
   181 #define DEFAULT_UNITS_SOFT_MAX          -1
   176 #define DEFAULT_RECOVER_POLICY          GST_RECOVER_POLICY_NONE
   182 #define DEFAULT_RECOVER_POLICY          GST_RECOVER_POLICY_NONE
   177 #define DEFAULT_TIMEOUT                 0
   183 #define DEFAULT_TIMEOUT                 0
   178 #define DEFAULT_SYNC_METHOD             GST_SYNC_METHOD_LATEST
   184 #define DEFAULT_SYNC_METHOD             GST_SYNC_METHOD_LATEST
   179 
   185 
   180 #define DEFAULT_BURST_UNIT              GST_TCP_UNIT_TYPE_UNDEFINED
   186 #define DEFAULT_BURST_UNIT              GST_UNIT_TYPE_UNDEFINED
   181 #define DEFAULT_BURST_VALUE             0
   187 #define DEFAULT_BURST_VALUE             0
   182 
       
   183 #define DEFAULT_QOS_DSCP                -1
       
   184 #define DEFAULT_HANDLE_READ             TRUE
       
   185 
       
   186 #define DEFAULT_RESEND_STREAMHEADER      TRUE
       
   187 
   188 
   188 enum
   189 enum
   189 {
   190 {
   190   PROP_0,
   191   PROP_0,
   191   PROP_PROTOCOL,
   192   PROP_PROTOCOL,
   211   PROP_BYTES_TO_SERVE,
   212   PROP_BYTES_TO_SERVE,
   212   PROP_BYTES_SERVED,
   213   PROP_BYTES_SERVED,
   213 
   214 
   214   PROP_BURST_UNIT,
   215   PROP_BURST_UNIT,
   215   PROP_BURST_VALUE,
   216   PROP_BURST_VALUE,
   216 
       
   217   PROP_QOS_DSCP,
       
   218 
       
   219   PROP_HANDLE_READ,
       
   220 
       
   221   PROP_RESEND_STREAMHEADER,
       
   222 
       
   223   PROP_NUM_FDS,
       
   224 
       
   225   PROP_LAST
       
   226 };
   217 };
   227 
   218 
   228 /* For backward compat, we can't really select the poll mode anymore with
   219 /* For backward compat, we can't really select the poll mode anymore with
   229  * GstPoll. */
   220  * GstPoll. */
   230 #define GST_TYPE_FDSET_MODE (gst_fdset_mode_get_type())
   221 #define GST_TYPE_FDSET_MODE (gst_fdset_mode_get_type())
   236     {0, "Select", "select"},
   227     {0, "Select", "select"},
   237     {1, "Poll", "poll"},
   228     {1, "Poll", "poll"},
   238     {2, "EPoll", "epoll"},
   229     {2, "EPoll", "epoll"},
   239     {0, NULL, NULL},
   230     {0, NULL, NULL},
   240   };
   231   };
   241 
       
   242   if (!fdset_mode_type) {
   232   if (!fdset_mode_type) {
   243     fdset_mode_type = g_enum_register_static ("GstFDSetMode", fdset_mode);
   233     fdset_mode_type = g_enum_register_static ("GstFDSetMode", fdset_mode);
   244   }
   234   }
   245   return fdset_mode_type;
   235   return fdset_mode_type;
   246 }
   236 }
   302 static GType
   292 static GType
   303 gst_unit_type_get_type (void)
   293 gst_unit_type_get_type (void)
   304 {
   294 {
   305   static GType unit_type_type = 0;
   295   static GType unit_type_type = 0;
   306   static const GEnumValue unit_type[] = {
   296   static const GEnumValue unit_type[] = {
   307     {GST_TCP_UNIT_TYPE_UNDEFINED, "Undefined", "undefined"},
   297     {GST_UNIT_TYPE_UNDEFINED, "Undefined", "undefined"},
   308     {GST_TCP_UNIT_TYPE_BUFFERS, "Buffers", "buffers"},
   298     {GST_UNIT_TYPE_BUFFERS, "Buffers", "buffers"},
   309     {GST_TCP_UNIT_TYPE_BYTES, "Bytes", "bytes"},
   299     {GST_UNIT_TYPE_BYTES, "Bytes", "bytes"},
   310     {GST_TCP_UNIT_TYPE_TIME, "Time", "time"},
   300     {GST_UNIT_TYPE_TIME, "Time", "time"},
   311     {0, NULL, NULL},
   301     {0, NULL, NULL},
   312   };
   302   };
   313 
   303 
   314   if (!unit_type_type) {
   304   if (!unit_type_type) {
   315     unit_type_type = g_enum_register_static ("GstTCPUnitType", unit_type);
   305     unit_type_type = g_enum_register_static ("GstTCPUnitType", unit_type);
   386   gobject_class->get_property = gst_multi_fd_sink_get_property;
   376   gobject_class->get_property = gst_multi_fd_sink_get_property;
   387   gobject_class->finalize = gst_multi_fd_sink_finalize;
   377   gobject_class->finalize = gst_multi_fd_sink_finalize;
   388 
   378 
   389   g_object_class_install_property (gobject_class, PROP_PROTOCOL,
   379   g_object_class_install_property (gobject_class, PROP_PROTOCOL,
   390       g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
   380       g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
   391           GST_TYPE_TCP_PROTOCOL, DEFAULT_PROTOCOL,
   381           GST_TYPE_TCP_PROTOCOL, DEFAULT_PROTOCOL, G_PARAM_READWRITE));
   392           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   393 
   382 
   394   /**
   383   /**
   395    * GstMultiFdSink::mode
   384    * GstMultiFdSink::mode
   396    *
   385    *
   397    * The mode for selecting activity on the fds. 
   386    * The mode for selecting activity on the fds. 
   400    * select and use the most optimal method.
   389    * select and use the most optimal method.
   401    */
   390    */
   402   g_object_class_install_property (gobject_class, PROP_MODE,
   391   g_object_class_install_property (gobject_class, PROP_MODE,
   403       g_param_spec_enum ("mode", "Mode",
   392       g_param_spec_enum ("mode", "Mode",
   404           "The mode for selecting activity on the fds (deprecated)",
   393           "The mode for selecting activity on the fds (deprecated)",
   405           GST_TYPE_FDSET_MODE, DEFAULT_MODE,
   394           GST_TYPE_FDSET_MODE, DEFAULT_MODE, G_PARAM_READWRITE));
   406           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   407 
   395 
   408   g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
   396   g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
   409       g_param_spec_int ("buffers-max", "Buffers max",
   397       g_param_spec_int ("buffers-max", "Buffers max",
   410           "max number of buffers to queue for a client (-1 = no limit)", -1,
   398           "max number of buffers to queue for a client (-1 = no limit)", -1,
   411           G_MAXINT, DEFAULT_BUFFERS_MAX,
   399           G_MAXINT, DEFAULT_BUFFERS_MAX, G_PARAM_READWRITE));
   412           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   400   g_object_class_install_property (gobject_class,
   413   g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX,
   401       PROP_BUFFERS_SOFT_MAX, g_param_spec_int ("buffers-soft-max",
   414       g_param_spec_int ("buffers-soft-max", "Buffers soft max",
   402           "Buffers soft max",
   415           "Recover client when going over this limit (-1 = no limit)", -1,
   403           "Recover client when going over this limit (-1 = no limit)", -1,
   416           G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX,
   404           G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE));
   417           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   418 
   405 
   419   g_object_class_install_property (gobject_class, PROP_BYTES_MIN,
   406   g_object_class_install_property (gobject_class, PROP_BYTES_MIN,
   420       g_param_spec_int ("bytes-min", "Bytes min",
   407       g_param_spec_int ("bytes-min", "Bytes min",
   421           "min number of bytes to queue (-1 = as little as possible)", -1,
   408           "min number of bytes to queue (-1 = as little as possible)", -1,
   422           G_MAXINT, DEFAULT_BYTES_MIN,
   409           G_MAXINT, DEFAULT_BYTES_MIN, G_PARAM_READWRITE));
   423           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   424   g_object_class_install_property (gobject_class, PROP_TIME_MIN,
   410   g_object_class_install_property (gobject_class, PROP_TIME_MIN,
   425       g_param_spec_int64 ("time-min", "Time min",
   411       g_param_spec_int64 ("time-min", "Time min",
   426           "min number of time to queue (-1 = as little as possible)", -1,
   412           "min number of time to queue (-1 = as little as possible)", -1,
   427           G_MAXINT64, DEFAULT_TIME_MIN,
   413           G_MAXINT64, DEFAULT_TIME_MIN, G_PARAM_READWRITE));
   428           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   429   g_object_class_install_property (gobject_class, PROP_BUFFERS_MIN,
   414   g_object_class_install_property (gobject_class, PROP_BUFFERS_MIN,
   430       g_param_spec_int ("buffers-min", "Buffers min",
   415       g_param_spec_int ("buffers-min", "Buffers min",
   431           "min number of buffers to queue (-1 = as few as possible)", -1,
   416           "min number of buffers to queue (-1 = as few as possible)", -1,
   432           G_MAXINT, DEFAULT_BUFFERS_MIN,
   417           G_MAXINT, DEFAULT_BUFFERS_MIN, G_PARAM_READWRITE));
   433           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   434 
   418 
   435   g_object_class_install_property (gobject_class, PROP_UNIT_TYPE,
   419   g_object_class_install_property (gobject_class, PROP_UNIT_TYPE,
   436       g_param_spec_enum ("unit-type", "Units type",
   420       g_param_spec_enum ("unit-type", "Units type",
   437           "The unit to measure the max/soft-max/queued properties",
   421           "The unit to measure the max/soft-max/queued properties",
   438           GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE,
   422           GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE, G_PARAM_READWRITE));
   439           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   440   g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
   423   g_object_class_install_property (gobject_class, PROP_UNITS_MAX,
   441       g_param_spec_int64 ("units-max", "Units max",
   424       g_param_spec_int64 ("units-max", "Units max",
   442           "max number of units to queue (-1 = no limit)", -1, G_MAXINT64,
   425           "max number of units to queue (-1 = no limit)", -1, G_MAXINT64,
   443           DEFAULT_UNITS_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   426           DEFAULT_UNITS_MAX, G_PARAM_READWRITE));
   444   g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX,
   427   g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX,
   445       g_param_spec_int64 ("units-soft-max", "Units soft max",
   428       g_param_spec_int64 ("units-soft-max", "Units soft max",
   446           "Recover client when going over this limit (-1 = no limit)", -1,
   429           "Recover client when going over this limit (-1 = no limit)", -1,
   447           G_MAXINT64, DEFAULT_UNITS_SOFT_MAX,
   430           G_MAXINT64, DEFAULT_UNITS_SOFT_MAX, G_PARAM_READWRITE));
   448           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   449 
   431 
   450   g_object_class_install_property (gobject_class, PROP_BUFFERS_QUEUED,
   432   g_object_class_install_property (gobject_class, PROP_BUFFERS_QUEUED,
   451       g_param_spec_uint ("buffers-queued", "Buffers queued",
   433       g_param_spec_uint ("buffers-queued", "Buffers queued",
   452           "Number of buffers currently queued", 0, G_MAXUINT, 0,
   434           "Number of buffers currently queued", 0, G_MAXUINT, 0,
   453           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   435           G_PARAM_READABLE));
   454 #if NOT_IMPLEMENTED
   436 #if NOT_IMPLEMENTED
   455   g_object_class_install_property (gobject_class, PROP_BYTES_QUEUED,
   437   g_object_class_install_property (gobject_class, PROP_BYTES_QUEUED,
   456       g_param_spec_uint ("bytes-queued", "Bytes queued",
   438       g_param_spec_uint ("bytes-queued", "Bytes queued",
   457           "Number of bytes currently queued", 0, G_MAXUINT, 0,
   439           "Number of bytes currently queued", 0, G_MAXUINT, 0,
   458           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   440           G_PARAM_READABLE));
   459   g_object_class_install_property (gobject_class, PROP_TIME_QUEUED,
   441   g_object_class_install_property (gobject_class, PROP_TIME_QUEUED,
   460       g_param_spec_uint64 ("time-queued", "Time queued",
   442       g_param_spec_uint64 ("time-queued", "Time queued",
   461           "Number of time currently queued", 0, G_MAXUINT64, 0,
   443           "Number of time currently queued", 0, G_MAXUINT64, 0,
   462           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   444           G_PARAM_READABLE));
   463 #endif
   445 #endif
   464 
   446 
   465   g_object_class_install_property (gobject_class, PROP_RECOVER_POLICY,
   447   g_object_class_install_property (gobject_class, PROP_RECOVER_POLICY,
   466       g_param_spec_enum ("recover-policy", "Recover Policy",
   448       g_param_spec_enum ("recover-policy", "Recover Policy",
   467           "How to recover when client reaches the soft max",
   449           "How to recover when client reaches the soft max",
   468           GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY,
   450           GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY, G_PARAM_READWRITE));
   469           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   470   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
   451   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
   471       g_param_spec_uint64 ("timeout", "Timeout",
   452       g_param_spec_uint64 ("timeout", "Timeout",
   472           "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)",
   453           "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)",
   473           0, G_MAXUINT64, DEFAULT_TIMEOUT,
   454           0, G_MAXUINT64, DEFAULT_TIMEOUT, G_PARAM_READWRITE));
   474           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   475   g_object_class_install_property (gobject_class, PROP_SYNC_METHOD,
   455   g_object_class_install_property (gobject_class, PROP_SYNC_METHOD,
   476       g_param_spec_enum ("sync-method", "Sync Method",
   456       g_param_spec_enum ("sync-method", "Sync Method",
   477           "How to sync new clients to the stream", GST_TYPE_SYNC_METHOD,
   457           "How to sync new clients to the stream",
   478           DEFAULT_SYNC_METHOD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   458           GST_TYPE_SYNC_METHOD, DEFAULT_SYNC_METHOD, G_PARAM_READWRITE));
   479   g_object_class_install_property (gobject_class, PROP_BYTES_TO_SERVE,
   459   g_object_class_install_property (gobject_class, PROP_BYTES_TO_SERVE,
   480       g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
   460       g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
   481           "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
   461           "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
   482           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   462           G_PARAM_READABLE));
   483   g_object_class_install_property (gobject_class, PROP_BYTES_SERVED,
   463   g_object_class_install_property (gobject_class, PROP_BYTES_SERVED,
   484       g_param_spec_uint64 ("bytes-served", "Bytes served",
   464       g_param_spec_uint64 ("bytes-served", "Bytes served",
   485           "Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
   465           "Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
   486           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   466           G_PARAM_READABLE));
   487 
   467 
   488   g_object_class_install_property (gobject_class, PROP_BURST_UNIT,
   468   g_object_class_install_property (gobject_class, PROP_BURST_UNIT,
   489       g_param_spec_enum ("burst-unit", "Burst unit",
   469       g_param_spec_enum ("burst-unit", "Burst unit",
   490           "The format of the burst units (when sync-method is burst[[-with]-keyframe])",
   470           "The format of the burst units (when sync-method is burst[[-with]-keyframe])",
   491           GST_TYPE_UNIT_TYPE, DEFAULT_BURST_UNIT,
   471           GST_TYPE_UNIT_TYPE, DEFAULT_BURST_UNIT, G_PARAM_READWRITE));
   492           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   493   g_object_class_install_property (gobject_class, PROP_BURST_VALUE,
   472   g_object_class_install_property (gobject_class, PROP_BURST_VALUE,
   494       g_param_spec_uint64 ("burst-value", "Burst value",
   473       g_param_spec_uint64 ("burst-value", "Burst value",
   495           "The amount of burst expressed in burst-unit", 0, G_MAXUINT64,
   474           "The amount of burst expressed in burst-unit",
   496           DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   475           0, G_MAXUINT64, DEFAULT_BURST_VALUE, G_PARAM_READWRITE));
   497 
       
   498   g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
       
   499       g_param_spec_int ("qos-dscp", "QoS diff srv code point",
       
   500           "Quality of Service, differentiated services code point (-1 default)",
       
   501           -1, 63, DEFAULT_QOS_DSCP,
       
   502           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   503   /**
       
   504    * GstMultiFdSink::handle-read
       
   505    *
       
   506    * Handle read requests from clients and discard the data.
       
   507    *
       
   508    * Since: 0.10.23
       
   509    */
       
   510   g_object_class_install_property (gobject_class, PROP_HANDLE_READ,
       
   511       g_param_spec_boolean ("handle-read", "Handle Read",
       
   512           "Handle client reads and discard the data",
       
   513           DEFAULT_HANDLE_READ, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   514   /**
       
   515    * GstMultiFdSink::resend-streamheader
       
   516    *
       
   517    * Resend the streamheaders to existing clients when they change.
       
   518    *
       
   519    * Since: 0.10.23
       
   520    */
       
   521   g_object_class_install_property (gobject_class, PROP_RESEND_STREAMHEADER,
       
   522       g_param_spec_boolean ("resend-streamheader", "Resend streamheader",
       
   523           "Resend the streamheader if it changes in the caps",
       
   524           DEFAULT_RESEND_STREAMHEADER,
       
   525           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   526 
       
   527   g_object_class_install_property (gobject_class, PROP_NUM_FDS,
       
   528       g_param_spec_uint ("num-fds", "Number of fds",
       
   529           "The current number of client file descriptors.",
       
   530           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
       
   531 
   476 
   532   /**
   477   /**
   533    * GstMultiFdSink::add:
   478    * GstMultiFdSink::add:
   534    * @gstmultifdsink: the multifdsink element to emit this signal on
   479    * @gstmultifdsink: the multifdsink element to emit this signal on
   535    * @fd:             the file descriptor to add to multifdsink
   480    * @fd:             the file descriptor to add to multifdsink
   543       G_TYPE_INT);
   488       G_TYPE_INT);
   544   /**
   489   /**
   545    * GstMultiFdSink::add-full:
   490    * GstMultiFdSink::add-full:
   546    * @gstmultifdsink: the multifdsink element to emit this signal on
   491    * @gstmultifdsink: the multifdsink element to emit this signal on
   547    * @fd:             the file descriptor to add to multifdsink
   492    * @fd:             the file descriptor to add to multifdsink
   548    * @sync:           the sync method to use
   493    * @keyframe:       start bursting from a keyframe
   549    * @unit_type_min:  the unit-type of @value_min
   494    * @unit_type_min:  the unit-type of @value_min
   550    * @value_min:      the minimum amount of data to burst expressed in
   495    * @value_min:      the minimum amount of data to burst expressed in
   551    *                  @unit_type_min units.
   496    *                  @unit_type_min units.
   552    * @unit_type_max:  the unit-type of @value_max
   497    * @unit_type_max:  the unit-type of @value_max
   553    * @value_max:      the maximum amount of data to burst expressed in
   498    * @value_max:      the maximum amount of data to burst expressed in
   558    */
   503    */
   559   gst_multi_fd_sink_signals[SIGNAL_ADD_BURST] =
   504   gst_multi_fd_sink_signals[SIGNAL_ADD_BURST] =
   560       g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass),
   505       g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass),
   561       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
   506       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
   562           add_full), NULL, NULL,
   507           add_full), NULL, NULL,
   563       gst_tcp_marshal_VOID__INT_ENUM_INT_UINT64_INT_UINT64, G_TYPE_NONE, 6,
   508       gst_tcp_marshal_VOID__INT_BOOLEAN_INT_UINT64_INT_UINT64, G_TYPE_NONE, 6,
   564       G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_TYPE, G_TYPE_UINT64,
   509       G_TYPE_INT, G_TYPE_BOOLEAN, GST_TYPE_UNIT_TYPE, G_TYPE_UINT64,
   565       GST_TYPE_UNIT_TYPE, G_TYPE_UINT64);
   510       GST_TYPE_UNIT_TYPE, G_TYPE_UINT64);
   566   /**
   511   /**
   567    * GstMultiFdSink::remove:
   512    * GstMultiFdSink::remove:
   568    * @gstmultifdsink: the multifdsink element to emit this signal on
   513    * @gstmultifdsink: the multifdsink element to emit this signal on
   569    * @fd:             the file descriptor to remove from multifdsink
   514    * @fd:             the file descriptor to remove from multifdsink
   574       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
   519       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
   575       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
   520       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass,
   576           remove), NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1,
   521           remove), NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1,
   577       G_TYPE_INT);
   522       G_TYPE_INT);
   578   /**
   523   /**
   579    * GstMultiFdSink::remove-flush:
   524    * GstMultiFdSink::remove_flush:
   580    * @gstmultifdsink: the multifdsink element to emit this signal on
   525    * @gstmultifdsink: the multifdsink element to emit this signal on
   581    * @fd:             the file descriptor to remove from multifdsink
   526    * @fd:             the file descriptor to remove from multifdsink
   582    *
   527    *
   583    * Remove the given open file descriptor from multifdsink after flushing all
   528    * Remove the given open file descriptor from multifdsink after flushing all
   584    * the pending data to the fd.
   529    * the pending data to the fd.
   715   this->timeout = DEFAULT_TIMEOUT;
   660   this->timeout = DEFAULT_TIMEOUT;
   716   this->def_sync_method = DEFAULT_SYNC_METHOD;
   661   this->def_sync_method = DEFAULT_SYNC_METHOD;
   717   this->def_burst_unit = DEFAULT_BURST_UNIT;
   662   this->def_burst_unit = DEFAULT_BURST_UNIT;
   718   this->def_burst_value = DEFAULT_BURST_VALUE;
   663   this->def_burst_value = DEFAULT_BURST_VALUE;
   719 
   664 
   720   this->qos_dscp = DEFAULT_QOS_DSCP;
       
   721   this->handle_read = DEFAULT_HANDLE_READ;
       
   722 
       
   723   this->resend_streamheader = DEFAULT_RESEND_STREAMHEADER;
       
   724 
       
   725   this->header_flags = 0;
   665   this->header_flags = 0;
   726 }
   666 }
   727 
   667 
   728 static void
   668 static void
   729 gst_multi_fd_sink_finalize (GObject * object)
   669 gst_multi_fd_sink_finalize (GObject * object)
   735   CLIENTS_LOCK_FREE (this);
   675   CLIENTS_LOCK_FREE (this);
   736   g_hash_table_destroy (this->fd_hash);
   676   g_hash_table_destroy (this->fd_hash);
   737   g_array_free (this->bufqueue, TRUE);
   677   g_array_free (this->bufqueue, TRUE);
   738 
   678 
   739   G_OBJECT_CLASS (parent_class)->finalize (object);
   679   G_OBJECT_CLASS (parent_class)->finalize (object);
   740 }
       
   741 
       
   742 static gint
       
   743 setup_dscp_client (GstMultiFdSink * sink, GstTCPClient * client)
       
   744 {
       
   745   gint tos;
       
   746   gint ret;
       
   747   union gst_sockaddr
       
   748   {
       
   749     struct sockaddr sa;
       
   750     struct sockaddr_in6 sa_in6;
       
   751     struct sockaddr_storage sa_stor;
       
   752   } sa;
       
   753   socklen_t slen = sizeof (sa);
       
   754   gint af;
       
   755 
       
   756   /* don't touch */
       
   757   if (sink->qos_dscp < 0)
       
   758     return 0;
       
   759 
       
   760   if ((ret = getsockname (client->fd.fd, &sa.sa, &slen)) < 0) {
       
   761     GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
       
   762     return ret;
       
   763   }
       
   764 
       
   765   af = sa.sa.sa_family;
       
   766 
       
   767   /* if this is an IPv4-mapped address then do IPv4 QoS */
       
   768   if (af == AF_INET6) {
       
   769 
       
   770     GST_DEBUG_OBJECT (sink, "check IP6 socket");
       
   771     if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
       
   772       GST_DEBUG_OBJECT (sink, "mapped to IPV4");
       
   773       af = AF_INET;
       
   774     }
       
   775   }
       
   776 
       
   777   /* extract and shift 6 bits of the DSCP */
       
   778   tos = (sink->qos_dscp & 0x3f) << 2;
       
   779 
       
   780   switch (af) {
       
   781     case AF_INET:
       
   782       ret = setsockopt (client->fd.fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos));
       
   783       break;
       
   784     case AF_INET6:
       
   785 #ifdef IPV6_TCLASS
       
   786       ret =
       
   787           setsockopt (client->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, &tos,
       
   788           sizeof (tos));
       
   789       break;
       
   790 #endif
       
   791     default:
       
   792       ret = 0;
       
   793       GST_ERROR_OBJECT (sink, "unsupported AF");
       
   794       break;
       
   795   }
       
   796   if (ret)
       
   797     GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));
       
   798 
       
   799   return ret;
       
   800 }
       
   801 
       
   802 
       
   803 static void
       
   804 setup_dscp (GstMultiFdSink * sink)
       
   805 {
       
   806   GList *clients, *next;
       
   807 
       
   808   CLIENTS_LOCK (sink);
       
   809   for (clients = sink->clients; clients; clients = next) {
       
   810     GstTCPClient *client;
       
   811 
       
   812     client = (GstTCPClient *) clients->data;
       
   813     next = g_list_next (clients);
       
   814 
       
   815     setup_dscp_client (sink, client);
       
   816   }
       
   817   CLIENTS_UNLOCK (sink);
       
   818 }
   680 }
   819 
   681 
   820 /* "add-full" signal implementation */
   682 /* "add-full" signal implementation */
   821 #ifdef __SYMBIAN32__
   683 #ifdef __SYMBIAN32__
   822 EXPORT_C
   684 EXPORT_C
   823 #endif
   685 #endif
   824 
   686 
   825 void
   687 void
   826 gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
   688 gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
   827     GstSyncMethod sync_method, GstTCPUnitType min_unit, guint64 min_value,
   689     GstSyncMethod sync_method, GstUnitType min_unit, guint64 min_value,
   828     GstTCPUnitType max_unit, guint64 max_value)
   690     GstUnitType max_unit, guint64 max_value)
   829 {
   691 {
   830   GstTCPClient *client;
   692   GstTCPClient *client;
   831   GList *clink;
   693   GList *clink;
   832   GTimeVal now;
   694   GTimeVal now;
   833   gint flags, res;
   695   gint flags, res;
   881   clink = sink->clients = g_list_prepend (sink->clients, client);
   743   clink = sink->clients = g_list_prepend (sink->clients, client);
   882   g_hash_table_insert (sink->fd_hash, &client->fd.fd, clink);
   744   g_hash_table_insert (sink->fd_hash, &client->fd.fd, clink);
   883   sink->clients_cookie++;
   745   sink->clients_cookie++;
   884 
   746 
   885   /* set the socket to non blocking */
   747   /* set the socket to non blocking */
   886 //  temporary  fix for multifdsink
   748 //temporary commented for multifdsink issue  
   887   //res = fcntl (fd, F_SETFL, O_NONBLOCK);
   749   //res = fcntl (fd, F_SETFL, O_NONBLOCK);  //Arun
       
   750   
   888   /* we always read from a client */
   751   /* we always read from a client */
   889   gst_poll_add_fd (sink->fdset, &client->fd);
   752   gst_poll_add_fd (sink->fdset, &client->fd);
   890 
   753 
   891   /* we don't try to read from write only fds */
   754   /* we don't try to read from write only fds */
   892   if (sink->handle_read) {
   755   flags = fcntl (fd, F_GETFL, 0);
   893     flags = fcntl (fd, F_GETFL, 0);
   756   if ((flags & O_ACCMODE) != O_WRONLY) {
   894     if ((flags & O_ACCMODE) != O_WRONLY) {
   757     gst_poll_fd_ctl_read (sink->fdset, &client->fd, TRUE);
   895       gst_poll_fd_ctl_read (sink->fdset, &client->fd, TRUE);
       
   896     }
       
   897   }
   758   }
   898   /* figure out the mode, can't use send() for non sockets */
   759   /* figure out the mode, can't use send() for non sockets */
   899   res = fstat (fd, &statbuf);
   760   res = fstat (fd, &statbuf);
   900   if (S_ISSOCK (statbuf.st_mode)) {
   761   if (S_ISSOCK (statbuf.st_mode)) {
   901     client->is_socket = TRUE;
   762     client->is_socket = TRUE;
   902     setup_dscp_client (sink, client);
       
   903   }
   763   }
   904 
   764 
   905   gst_poll_restart (sink->fdset);
   765   gst_poll_restart (sink->fdset);
   906 
   766 
   907   CLIENTS_UNLOCK (sink);
   767   CLIENTS_UNLOCK (sink);
  1417               "[fd %5d] previous caps did not have streamheader, sending",
  1277               "[fd %5d] previous caps did not have streamheader, sending",
  1418               client->fd.fd);
  1278               client->fd.fd);
  1419           send_streamheader = TRUE;
  1279           send_streamheader = TRUE;
  1420         } else {
  1280         } else {
  1421           /* both old and new caps have streamheader set */
  1281           /* both old and new caps have streamheader set */
  1422           if (!sink->resend_streamheader) {
  1282           sh1 = gst_structure_get_value (s, "streamheader");
       
  1283           s = gst_caps_get_structure (caps, 0);
       
  1284           sh2 = gst_structure_get_value (s, "streamheader");
       
  1285           if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) {
  1423             GST_DEBUG_OBJECT (sink,
  1286             GST_DEBUG_OBJECT (sink,
  1424                 "[fd %5d] asked to not resend the streamheader, not sending",
  1287                 "[fd %5d] new streamheader different from old, sending",
  1425                 client->fd.fd);
  1288                 client->fd.fd);
  1426             send_streamheader = FALSE;
  1289             send_streamheader = TRUE;
  1427           } else {
       
  1428             sh1 = gst_structure_get_value (s, "streamheader");
       
  1429             s = gst_caps_get_structure (caps, 0);
       
  1430             sh2 = gst_structure_get_value (s, "streamheader");
       
  1431             if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) {
       
  1432               GST_DEBUG_OBJECT (sink,
       
  1433                   "[fd %5d] new streamheader different from old, sending",
       
  1434                   client->fd.fd);
       
  1435               send_streamheader = TRUE;
       
  1436             }
       
  1437           }
  1290           }
  1438         }
  1291         }
  1439       }
  1292       }
  1440     }
  1293     }
  1441     /* Replace the old caps */
  1294     /* Replace the old caps */
  1559  * queue to satify the limit, return len(queue) + 1 */
  1412  * queue to satify the limit, return len(queue) + 1 */
  1560 static gint
  1413 static gint
  1561 get_buffers_max (GstMultiFdSink * sink, gint64 max)
  1414 get_buffers_max (GstMultiFdSink * sink, gint64 max)
  1562 {
  1415 {
  1563   switch (sink->unit_type) {
  1416   switch (sink->unit_type) {
  1564     case GST_TCP_UNIT_TYPE_BUFFERS:
  1417     case GST_UNIT_TYPE_BUFFERS:
  1565       return max;
  1418       return max;
  1566     case GST_TCP_UNIT_TYPE_TIME:
  1419     case GST_UNIT_TYPE_TIME:
  1567     {
  1420     {
  1568       GstBuffer *buf;
  1421       GstBuffer *buf;
  1569       int i;
  1422       int i;
  1570       int len;
  1423       int len;
  1571       gint64 diff;
  1424       gint64 diff;
  1585             return i + 1;
  1438             return i + 1;
  1586         }
  1439         }
  1587       }
  1440       }
  1588       return len + 1;
  1441       return len + 1;
  1589     }
  1442     }
  1590     case GST_TCP_UNIT_TYPE_BYTES:
  1443     case GST_UNIT_TYPE_BYTES:
  1591     {
  1444     {
  1592       GstBuffer *buf;
  1445       GstBuffer *buf;
  1593       int i;
  1446       int i;
  1594       int len;
  1447       int len;
  1595       gint acc = 0;
  1448       gint acc = 0;
  1728  * right type, leave the other values untouched 
  1581  * right type, leave the other values untouched 
  1729  *
  1582  *
  1730  * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise.
  1583  * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise.
  1731  */
  1584  */
  1732 static gboolean
  1585 static gboolean
  1733 assign_value (GstTCPUnitType unit, guint64 value, gint * bytes, gint * buffers,
  1586 assign_value (GstUnitType unit, guint64 value, gint * bytes, gint * buffers,
  1734     GstClockTime * time)
  1587     GstClockTime * time)
  1735 {
  1588 {
  1736   gboolean res = TRUE;
  1589   gboolean res = TRUE;
  1737 
  1590 
  1738   /* set only the limit of the given format to the given value */
  1591   /* set only the limit of the given format to the given value */
  1739   switch (unit) {
  1592   switch (unit) {
  1740     case GST_TCP_UNIT_TYPE_BUFFERS:
  1593     case GST_UNIT_TYPE_BUFFERS:
  1741       *buffers = (gint) value;
  1594       *buffers = (gint) value;
  1742       break;
  1595       break;
  1743     case GST_TCP_UNIT_TYPE_TIME:
  1596     case GST_UNIT_TYPE_TIME:
  1744       *time = value;
  1597       *time = value;
  1745       break;
  1598       break;
  1746     case GST_TCP_UNIT_TYPE_BYTES:
  1599     case GST_UNIT_TYPE_BYTES:
  1747       *bytes = (gint) value;
  1600       *bytes = (gint) value;
  1748       break;
  1601       break;
  1749     case GST_TCP_UNIT_TYPE_UNDEFINED:
  1602     case GST_UNIT_TYPE_UNDEFINED:
  1750     default:
  1603     default:
  1751       res = FALSE;
  1604       res = FALSE;
  1752       break;
  1605       break;
  1753   }
  1606   }
  1754   return res;
  1607   return res;
  1761  * burst values. @idx contains the index in the buffer that contains enough
  1614  * burst values. @idx contains the index in the buffer that contains enough
  1762  * data to satisfy the limits or the last buffer in the queue when the
  1615  * data to satisfy the limits or the last buffer in the queue when the
  1763  * function returns FALSE.
  1616  * function returns FALSE.
  1764  */
  1617  */
  1765 static gboolean
  1618 static gboolean
  1766 count_burst_unit (GstMultiFdSink * sink, gint * min_idx,
  1619 count_burst_unit (GstMultiFdSink * sink, gint * min_idx, GstUnitType min_unit,
  1767     GstTCPUnitType min_unit, guint64 min_value, gint * max_idx,
  1620     guint64 min_value, gint * max_idx, GstUnitType max_unit, guint64 max_value)
  1768     GstTCPUnitType max_unit, guint64 max_value)
       
  1769 {
  1621 {
  1770   gint bytes_min = -1, buffers_min = -1;
  1622   gint bytes_min = -1, buffers_min = -1;
  1771   gint bytes_max = -1, buffers_max = -1;
  1623   gint bytes_max = -1, buffers_max = -1;
  1772   GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE;
  1624   GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE;
  1773 
  1625 
  2384     /* no point in searching beyond the queue length */
  2236     /* no point in searching beyond the queue length */
  2385     gint limit = queuelen;
  2237     gint limit = queuelen;
  2386     GstBuffer *buf;
  2238     GstBuffer *buf;
  2387 
  2239 
  2388     /* no point in searching beyond the soft-max if any. */
  2240     /* no point in searching beyond the soft-max if any. */
  2389     if (soft_max_buffers > 0) {
  2241     if (soft_max_buffers) {
  2390       limit = MIN (limit, soft_max_buffers);
  2242       limit = MIN (limit, soft_max_buffers);
  2391     }
  2243     }
  2392     GST_LOG_OBJECT (sink,
  2244     GST_LOG_OBJECT (sink, "extending queue to include sync point, now at %d",
  2393         "extending queue to include sync point, now at %d, limit is %d",
  2245         max_buffer_usage);
  2394         max_buffer_usage, limit);
       
  2395     for (i = 0; i < limit; i++) {
  2246     for (i = 0; i < limit; i++) {
  2396       buf = g_array_index (sink->bufqueue, GstBuffer *, i);
  2247       buf = g_array_index (sink->bufqueue, GstBuffer *, i);
  2397       if (is_sync_frame (sink, buf)) {
  2248       if (is_sync_frame (sink, buf)) {
  2398         /* found a sync frame, now extend the buffer usage to
  2249         /* found a sync frame, now extend the buffer usage to
  2399          * include at least this frame. */
  2250          * include at least this frame. */
  2489           if (res == -1) {
  2340           if (res == -1) {
  2490             GST_WARNING_OBJECT (sink, "fnctl failed for %d, removing: %s (%d)",
  2341             GST_WARNING_OBJECT (sink, "fnctl failed for %d, removing: %s (%d)",
  2491                 fd, g_strerror (errno), errno);
  2342                 fd, g_strerror (errno), errno);
  2492             if (errno == EBADF) {
  2343             if (errno == EBADF) {
  2493               client->status = GST_CLIENT_STATUS_ERROR;
  2344               client->status = GST_CLIENT_STATUS_ERROR;
  2494               /* releases the CLIENTS lock */
       
  2495               gst_multi_fd_sink_remove_client_link (sink, clients);
  2345               gst_multi_fd_sink_remove_client_link (sink, clients);
  2496             }
  2346             }
  2497           }
  2347           }
  2498         }
  2348         }
  2499         CLIENTS_UNLOCK (sink);
  2349         CLIENTS_UNLOCK (sink);
  2735       multifdsink->def_burst_unit = g_value_get_enum (value);
  2585       multifdsink->def_burst_unit = g_value_get_enum (value);
  2736       break;
  2586       break;
  2737     case PROP_BURST_VALUE:
  2587     case PROP_BURST_VALUE:
  2738       multifdsink->def_burst_value = g_value_get_uint64 (value);
  2588       multifdsink->def_burst_value = g_value_get_uint64 (value);
  2739       break;
  2589       break;
  2740     case PROP_QOS_DSCP:
       
  2741       multifdsink->qos_dscp = g_value_get_int (value);
       
  2742       setup_dscp (multifdsink);
       
  2743       break;
       
  2744     case PROP_HANDLE_READ:
       
  2745       multifdsink->handle_read = g_value_get_boolean (value);
       
  2746       break;
       
  2747     case PROP_RESEND_STREAMHEADER:
       
  2748       multifdsink->resend_streamheader = g_value_get_boolean (value);
       
  2749       break;
       
  2750 
  2590 
  2751     default:
  2591     default:
  2752       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
  2592       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
  2753       break;
  2593       break;
  2754   }
  2594   }
  2820     case PROP_BURST_UNIT:
  2660     case PROP_BURST_UNIT:
  2821       g_value_set_enum (value, multifdsink->def_burst_unit);
  2661       g_value_set_enum (value, multifdsink->def_burst_unit);
  2822       break;
  2662       break;
  2823     case PROP_BURST_VALUE:
  2663     case PROP_BURST_VALUE:
  2824       g_value_set_uint64 (value, multifdsink->def_burst_value);
  2664       g_value_set_uint64 (value, multifdsink->def_burst_value);
  2825       break;
       
  2826     case PROP_QOS_DSCP:
       
  2827       g_value_set_int (value, multifdsink->qos_dscp);
       
  2828       break;
       
  2829     case PROP_HANDLE_READ:
       
  2830       g_value_set_boolean (value, multifdsink->handle_read);
       
  2831       break;
       
  2832     case PROP_RESEND_STREAMHEADER:
       
  2833       g_value_set_boolean (value, multifdsink->resend_streamheader);
       
  2834       break;
       
  2835     case PROP_NUM_FDS:
       
  2836       g_value_set_uint (value, g_hash_table_size (multifdsink->fd_hash));
       
  2837       break;
  2665       break;
  2838 
  2666 
  2839     default:
  2667     default:
  2840       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
  2668       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
  2841       break;
  2669       break;