--- a/gst_plugins_base/gst-libs/gst/rtp/gstbasertpdepayload.c Wed Mar 24 17:58:42 2010 -0500
+++ b/gst_plugins_base/gst-libs/gst/rtp/gstbasertpdepayload.c Wed Mar 24 18:04:17 2010 -0500
@@ -56,6 +56,10 @@
gboolean discont;
GstClockTime timestamp;
GstClockTime duration;
+
+ guint32 next_seqnum;
+
+ gboolean negotiated;
};
/* Filter signals and args */
@@ -70,7 +74,8 @@
enum
{
PROP_0,
- PROP_QUEUE_DELAY
+ PROP_QUEUE_DELAY,
+ PROP_LAST
};
static void gst_base_rtp_depayload_finalize (GObject * object);
@@ -90,6 +95,8 @@
static void gst_base_rtp_depayload_set_gst_timestamp
(GstBaseRTPDepayload * filter, guint32 rtptime, GstBuffer * buf);
+static gboolean gst_base_rtp_depayload_packet_lost (GstBaseRTPDepayload *
+ filter, GstEvent * event);
GST_BOILERPLATE (GstBaseRTPDepayload, gst_base_rtp_depayload, GstElement,
GST_TYPE_ELEMENT);
@@ -128,12 +135,13 @@
g_object_class_install_property (gobject_class, PROP_QUEUE_DELAY,
g_param_spec_uint ("queue-delay", "Queue Delay",
"Amount of ms to queue/buffer, deprecated", 0, G_MAXUINT,
- DEFAULT_QUEUE_DELAY, G_PARAM_READWRITE));
+ DEFAULT_QUEUE_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
#endif
gstelement_class->change_state = gst_base_rtp_depayload_change_state;
klass->set_gst_timestamp = gst_base_rtp_depayload_set_gst_timestamp;
+ klass->packet_lost = gst_base_rtp_depayload_packet_lost;
GST_DEBUG_CATEGORY_INIT (basertpdepayload_debug, "basertpdepayload", 0,
"Base class for RTP Depayloaders");
@@ -237,6 +245,8 @@
else
res = TRUE;
+ priv->negotiated = res;
+
gst_object_unref (filter);
return res;
@@ -251,29 +261,89 @@
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *out_buf;
GstClockTime timestamp;
+ guint16 seqnum;
+ guint32 rtptime;
+ gboolean reset_seq, discont;
+ gint gap;
filter = GST_BASE_RTP_DEPAYLOAD (GST_OBJECT_PARENT (pad));
+ priv = filter->priv;
- priv = filter->priv;
+ /* we must have a setcaps first */
+ if (G_UNLIKELY (!priv->negotiated))
+ goto not_negotiated;
+
+ /* we must validate, it's possible that this element is plugged right after a
+ * network receiver and we don't want to operate on invalid data */
+ if (G_UNLIKELY (!gst_rtp_buffer_validate (in)))
+ goto invalid_buffer;
+
priv->discont = GST_BUFFER_IS_DISCONT (in);
+ timestamp = GST_BUFFER_TIMESTAMP (in);
/* convert to running_time and save the timestamp, this is the timestamp
* we put on outgoing buffers. */
- timestamp = GST_BUFFER_TIMESTAMP (in);
timestamp = gst_segment_to_running_time (&filter->segment, GST_FORMAT_TIME,
timestamp);
priv->timestamp = timestamp;
priv->duration = GST_BUFFER_DURATION (in);
+ seqnum = gst_rtp_buffer_get_seq (in);
+ rtptime = gst_rtp_buffer_get_timestamp (in);
+ reset_seq = TRUE;
+ discont = FALSE;
+
+ GST_LOG_OBJECT (filter, "discont %d, seqnum %u, rtptime %u, timestamp %"
+ GST_TIME_FORMAT, priv->discont, seqnum, rtptime,
+ GST_TIME_ARGS (timestamp));
+
+ /* Check seqnum. This is a very simple check that makes sure that the seqnums
+ * are striclty increasing, dropping anything that is out of the ordinary. We
+ * can only do this when the next_seqnum is known. */
+ if (G_LIKELY (priv->next_seqnum != -1)) {
+ gap = gst_rtp_buffer_compare_seqnum (seqnum, priv->next_seqnum);
+
+ /* if we have no gap, all is fine */
+ if (G_UNLIKELY (gap != 0)) {
+ GST_LOG_OBJECT (filter, "got packet %u, expected %u, gap %d", seqnum,
+ priv->next_seqnum, gap);
+ if (gap < 0) {
+ /* seqnum > next_seqnum, we are missing some packets, this is always a
+ * DISCONT. */
+ GST_LOG_OBJECT (filter, "%d missing packets", gap);
+ discont = TRUE;
+ } else {
+ /* seqnum < next_seqnum, we have seen this packet before or the sender
+ * could be restarted. If the packet is not too old, we throw it away as
+ * a duplicate, otherwise we mark discont and continue. 100 misordered
+ * packets is a good threshold. See also RFC 4737. */
+ if (gap < 100)
+ goto dropping;
+
+ GST_LOG_OBJECT (filter,
+ "%d > 100, packet too old, sender likely restarted", gap);
+ discont = TRUE;
+ }
+ }
+ }
+ priv->next_seqnum = (seqnum + 1) & 0xffff;
+
+ if (G_UNLIKELY (discont && !priv->discont)) {
+ GST_LOG_OBJECT (filter, "mark DISCONT on input buffer");
+ /* we detected a seqnum discont but the buffer was not flagged with a discont,
+ * set the discont flag so that the subclass can throw away old data. */
+ priv->discont = TRUE;
+ GST_BUFFER_FLAG_SET (in, GST_BUFFER_FLAG_DISCONT);
+ }
+
bclass = GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter);
+ if (G_UNLIKELY (bclass->process == NULL))
+ goto no_process;
+
/* let's send it out to processing */
out_buf = bclass->process (filter, in);
if (out_buf) {
- guint32 rtptime;
-
- rtptime = gst_rtp_buffer_get_timestamp (in);
-
/* we pass rtptime as backward compatibility, in reality, the incomming
* buffer timestamp is always applied to the outgoing packet. */
ret = gst_base_rtp_depayload_push_ts (filter, rtptime, out_buf);
@@ -281,6 +351,38 @@
gst_buffer_unref (in);
return ret;
+
+ /* ERRORS */
+not_negotiated:
+ {
+ /* this is not fatal but should be filtered earlier */
+ GST_ELEMENT_ERROR (filter, CORE, NEGOTIATION, (NULL),
+ ("Not RTP format was negotiated"));
+ gst_buffer_unref (in);
+ return GST_FLOW_NOT_NEGOTIATED;
+ }
+invalid_buffer:
+ {
+ /* this is not fatal but should be filtered earlier */
+ GST_ELEMENT_WARNING (filter, STREAM, DECODE, (NULL),
+ ("Received invalid RTP payload, dropping"));
+ gst_buffer_unref (in);
+ return GST_FLOW_OK;
+ }
+dropping:
+ {
+ GST_WARNING_OBJECT (filter, "%d <= 100, dropping old packet", gap);
+ gst_buffer_unref (in);
+ return GST_FLOW_OK;
+ }
+no_process:
+ {
+ /* this is not fatal but should be filtered earlier */
+ GST_ELEMENT_ERROR (filter, STREAM, NOT_IMPLEMENTED, (NULL),
+ ("The subclass does not have a process method"));
+ gst_buffer_unref (in);
+ return GST_FLOW_ERROR;
+ }
}
static gboolean
@@ -297,6 +399,7 @@
gst_segment_init (&filter->segment, GST_FORMAT_UNDEFINED);
filter->need_newsegment = TRUE;
+ filter->priv->next_seqnum = -1;
break;
case GST_EVENT_NEWSEGMENT:
{
@@ -316,6 +419,29 @@
gst_event_unref (event);
break;
}
+ case GST_EVENT_CUSTOM_DOWNSTREAM:
+ {
+ GstBaseRTPDepayloadClass *bclass;
+
+ bclass = GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter);
+
+ if (gst_event_has_name (event, "GstRTPPacketLost")) {
+ /* we get this event from the jitterbuffer when it considers a packet as
+ * being lost. We send it to our packet_lost vmethod. The default
+ * implementation will make time progress by pushing out a NEWSEGMENT
+ * update event. Subclasses can override and to one of the following:
+ * - Adjust timestamp/duration to something more accurate before
+ * calling the parent (default) packet_lost method.
+ * - do some more advanced error concealing on the already received
+ * (fragmented) packets.
+ * - ignore the packet lost.
+ */
+ if (bclass->packet_lost)
+ res = bclass->packet_lost (filter, event);
+ }
+ gst_event_unref (event);
+ break;
+ }
default:
/* pass other events forward */
res = gst_pad_push_event (filter->srcpad, event);
@@ -324,6 +450,28 @@
return res;
}
+static GstEvent *
+create_segment_event (GstBaseRTPDepayload * filter, gboolean update,
+ GstClockTime position)
+{
+ GstEvent *event;
+ GstClockTime stop;
+ GstBaseRTPDepayloadPrivate *priv;
+
+ priv = filter->priv;
+
+ if (priv->npt_stop != -1)
+ stop = priv->npt_stop - priv->npt_start;
+ else
+ stop = -1;
+
+ event = gst_event_new_new_segment_full (update, priv->play_speed,
+ priv->play_scale, GST_FORMAT_TIME, position, stop,
+ position + priv->npt_start);
+
+ return event;
+}
+
static GstFlowReturn
gst_base_rtp_depayload_push_full (GstBaseRTPDepayload * filter,
gboolean do_ts, guint32 rtptime, GstBuffer * out_buf)
@@ -337,7 +485,7 @@
/* set the caps if any */
srccaps = GST_PAD_CAPS (filter->srcpad);
- if (srccaps)
+ if (G_LIKELY (srccaps))
gst_buffer_set_caps (out_buf, srccaps);
bclass = GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter);
@@ -346,7 +494,20 @@
if (bclass->set_gst_timestamp && do_ts)
bclass->set_gst_timestamp (filter, rtptime, out_buf);
- if (priv->discont) {
+ /* if this is the first buffer send a NEWSEGMENT */
+ if (G_UNLIKELY (filter->need_newsegment)) {
+ GstEvent *event;
+
+ event = create_segment_event (filter, FALSE, 0);
+
+ gst_pad_push_event (filter->srcpad, event);
+
+ filter->need_newsegment = FALSE;
+ GST_DEBUG_OBJECT (filter, "Pushed newsegment event on this first buffer");
+ }
+
+ if (G_UNLIKELY (priv->discont)) {
+ GST_LOG_OBJECT (filter, "Marking DISCONT on output buffer");
GST_BUFFER_FLAG_SET (out_buf, GST_BUFFER_FLAG_DISCONT);
priv->discont = FALSE;
}
@@ -355,8 +516,8 @@
GST_LOG_OBJECT (filter, "Pushing buffer size %d, timestamp %" GST_TIME_FORMAT,
GST_BUFFER_SIZE (out_buf),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (out_buf)));
+
ret = gst_pad_push (filter->srcpad, out_buf);
- GST_LOG_OBJECT (filter, "Pushed buffer: %s", gst_flow_get_name (ret));
return ret;
}
@@ -410,6 +571,38 @@
return gst_base_rtp_depayload_push_full (filter, FALSE, 0, out_buf);
}
+/* convert the PacketLost event form a jitterbuffer to a segment update.
+ * subclasses can override this. */
+static gboolean
+gst_base_rtp_depayload_packet_lost (GstBaseRTPDepayload * filter,
+ GstEvent * event)
+{
+ GstBaseRTPDepayloadPrivate *priv;
+ GstClockTime timestamp, duration, position;
+ GstEvent *sevent;
+ const GstStructure *s;
+
+ priv = filter->priv;
+
+ s = gst_event_get_structure (event);
+
+ /* first start by parsing the timestamp and duration */
+ timestamp = -1;
+ duration = -1;
+
+ gst_structure_get_clock_time (s, "timestamp", ×tamp);
+ gst_structure_get_clock_time (s, "duration", &duration);
+
+ position = timestamp;
+ if (duration != -1)
+ position += duration;
+
+ /* update the current segment with the elapsed time */
+ sevent = create_segment_event (filter, TRUE, position);
+
+ return gst_pad_push_event (filter->srcpad, sevent);
+}
+
static void
gst_base_rtp_depayload_set_gst_timestamp (GstBaseRTPDepayload * filter,
guint32 rtptime, GstBuffer * buf)
@@ -428,28 +621,6 @@
GST_BUFFER_TIMESTAMP (buf) = priv->timestamp;
if (!GST_CLOCK_TIME_IS_VALID (duration))
GST_BUFFER_DURATION (buf) = priv->duration;
-
- /* if this is the first buffer send a NEWSEGMENT */
- if (filter->need_newsegment) {
- GstEvent *event;
- GstClockTime stop, position;
-
- if (priv->npt_stop != -1)
- stop = priv->npt_stop - priv->npt_start;
- else
- stop = -1;
-
- position = priv->npt_start;
-
- event =
- gst_event_new_new_segment_full (FALSE, priv->play_speed,
- priv->play_scale, GST_FORMAT_TIME, 0, stop, position);
-
- gst_pad_push_event (filter->srcpad, event);
-
- filter->need_newsegment = FALSE;
- GST_DEBUG_OBJECT (filter, "Pushed newsegment event on this first buffer");
- }
}
static GstStateChangeReturn
@@ -472,6 +643,8 @@
priv->npt_stop = -1;
priv->play_speed = 1.0;
priv->play_scale = 1.0;
+ priv->next_seqnum = -1;
+ priv->negotiated = FALSE;
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
break;