gst_plugins_base/gst-libs/gst/rtp/gstbasertpdepayload.c
branchRCL_3
changeset 29 567bb019e3e3
parent 0 0e761a78d257
child 30 7e817e7e631c
--- a/gst_plugins_base/gst-libs/gst/rtp/gstbasertpdepayload.c	Wed Mar 31 22:03:18 2010 +0300
+++ b/gst_plugins_base/gst-libs/gst/rtp/gstbasertpdepayload.c	Tue Aug 31 15:30:33 2010 +0300
@@ -56,6 +56,10 @@
   gboolean discont;
   GstClockTime timestamp;
   GstClockTime duration;
+
+  guint32 next_seqnum;
+
+  gboolean negotiated;
 };
 
 /* Filter signals and args */
@@ -70,7 +74,8 @@
 enum
 {
   PROP_0,
-  PROP_QUEUE_DELAY
+  PROP_QUEUE_DELAY,
+  PROP_LAST
 };
 
 static void gst_base_rtp_depayload_finalize (GObject * object);
@@ -90,6 +95,8 @@
 
 static void gst_base_rtp_depayload_set_gst_timestamp
     (GstBaseRTPDepayload * filter, guint32 rtptime, GstBuffer * buf);
+static gboolean gst_base_rtp_depayload_packet_lost (GstBaseRTPDepayload *
+    filter, GstEvent * event);
 
 GST_BOILERPLATE (GstBaseRTPDepayload, gst_base_rtp_depayload, GstElement,
     GST_TYPE_ELEMENT);
@@ -128,12 +135,13 @@
   g_object_class_install_property (gobject_class, PROP_QUEUE_DELAY,
       g_param_spec_uint ("queue-delay", "Queue Delay",
           "Amount of ms to queue/buffer, deprecated", 0, G_MAXUINT,
-          DEFAULT_QUEUE_DELAY, G_PARAM_READWRITE));
+          DEFAULT_QUEUE_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 #endif
 
   gstelement_class->change_state = gst_base_rtp_depayload_change_state;
 
   klass->set_gst_timestamp = gst_base_rtp_depayload_set_gst_timestamp;
+  klass->packet_lost = gst_base_rtp_depayload_packet_lost;
 
   GST_DEBUG_CATEGORY_INIT (basertpdepayload_debug, "basertpdepayload", 0,
       "Base class for RTP Depayloaders");
@@ -237,6 +245,8 @@
   else
     res = TRUE;
 
+  priv->negotiated = res;
+
   gst_object_unref (filter);
 
   return res;
@@ -251,29 +261,89 @@
   GstFlowReturn ret = GST_FLOW_OK;
   GstBuffer *out_buf;
   GstClockTime timestamp;
+  guint16 seqnum;
+  guint32 rtptime;
+  gboolean reset_seq, discont;
+  gint gap;
 
   filter = GST_BASE_RTP_DEPAYLOAD (GST_OBJECT_PARENT (pad));
+  priv = filter->priv;
 
-  priv = filter->priv;
+  /* we must have a setcaps first */
+  if (G_UNLIKELY (!priv->negotiated))
+    goto not_negotiated;
+
+  /* we must validate, it's possible that this element is plugged right after a
+   * network receiver and we don't want to operate on invalid data */
+  if (G_UNLIKELY (!gst_rtp_buffer_validate (in)))
+    goto invalid_buffer;
+
   priv->discont = GST_BUFFER_IS_DISCONT (in);
 
+  timestamp = GST_BUFFER_TIMESTAMP (in);
   /* convert to running_time and save the timestamp, this is the timestamp
    * we put on outgoing buffers. */
-  timestamp = GST_BUFFER_TIMESTAMP (in);
   timestamp = gst_segment_to_running_time (&filter->segment, GST_FORMAT_TIME,
       timestamp);
   priv->timestamp = timestamp;
   priv->duration = GST_BUFFER_DURATION (in);
 
+  seqnum = gst_rtp_buffer_get_seq (in);
+  rtptime = gst_rtp_buffer_get_timestamp (in);
+  reset_seq = TRUE;
+  discont = FALSE;
+
+  GST_LOG_OBJECT (filter, "discont %d, seqnum %u, rtptime %u, timestamp %"
+      GST_TIME_FORMAT, priv->discont, seqnum, rtptime,
+      GST_TIME_ARGS (timestamp));
+
+  /* Check seqnum. This is a very simple check that makes sure that the seqnums
+   * are striclty increasing, dropping anything that is out of the ordinary. We
+   * can only do this when the next_seqnum is known. */
+  if (G_LIKELY (priv->next_seqnum != -1)) {
+    gap = gst_rtp_buffer_compare_seqnum (seqnum, priv->next_seqnum);
+
+    /* if we have no gap, all is fine */
+    if (G_UNLIKELY (gap != 0)) {
+      GST_LOG_OBJECT (filter, "got packet %u, expected %u, gap %d", seqnum,
+          priv->next_seqnum, gap);
+      if (gap < 0) {
+        /* seqnum > next_seqnum, we are missing some packets, this is always a
+         * DISCONT. */
+        GST_LOG_OBJECT (filter, "%d missing packets", gap);
+        discont = TRUE;
+      } else {
+        /* seqnum < next_seqnum, we have seen this packet before or the sender
+         * could be restarted. If the packet is not too old, we throw it away as
+         * a duplicate, otherwise we mark discont and continue. 100 misordered
+         * packets is a good threshold. See also RFC 4737. */
+        if (gap < 100)
+          goto dropping;
+
+        GST_LOG_OBJECT (filter,
+            "%d > 100, packet too old, sender likely restarted", gap);
+        discont = TRUE;
+      }
+    }
+  }
+  priv->next_seqnum = (seqnum + 1) & 0xffff;
+
+  if (G_UNLIKELY (discont && !priv->discont)) {
+    GST_LOG_OBJECT (filter, "mark DISCONT on input buffer");
+    /* we detected a seqnum discont but the buffer was not flagged with a discont,
+     * set the discont flag so that the subclass can throw away old data. */
+    priv->discont = TRUE;
+    GST_BUFFER_FLAG_SET (in, GST_BUFFER_FLAG_DISCONT);
+  }
+
   bclass = GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter);
 
+  if (G_UNLIKELY (bclass->process == NULL))
+    goto no_process;
+
   /* let's send it out to processing */
   out_buf = bclass->process (filter, in);
   if (out_buf) {
-    guint32 rtptime;
-
-    rtptime = gst_rtp_buffer_get_timestamp (in);
-
     /* we pass rtptime as backward compatibility, in reality, the incomming
      * buffer timestamp is always applied to the outgoing packet. */
     ret = gst_base_rtp_depayload_push_ts (filter, rtptime, out_buf);
@@ -281,6 +351,38 @@
   gst_buffer_unref (in);
 
   return ret;
+
+  /* ERRORS */
+not_negotiated:
+  {
+    /* this is not fatal but should be filtered earlier */
+    GST_ELEMENT_ERROR (filter, CORE, NEGOTIATION, (NULL),
+        ("Not RTP format was negotiated"));
+    gst_buffer_unref (in);
+    return GST_FLOW_NOT_NEGOTIATED;
+  }
+invalid_buffer:
+  {
+    /* this is not fatal but should be filtered earlier */
+    GST_ELEMENT_WARNING (filter, STREAM, DECODE, (NULL),
+        ("Received invalid RTP payload, dropping"));
+    gst_buffer_unref (in);
+    return GST_FLOW_OK;
+  }
+dropping:
+  {
+    GST_WARNING_OBJECT (filter, "%d <= 100, dropping old packet", gap);
+    gst_buffer_unref (in);
+    return GST_FLOW_OK;
+  }
+no_process:
+  {
+    /* this is not fatal but should be filtered earlier */
+    GST_ELEMENT_ERROR (filter, STREAM, NOT_IMPLEMENTED, (NULL),
+        ("The subclass does not have a process method"));
+    gst_buffer_unref (in);
+    return GST_FLOW_ERROR;
+  }
 }
 
 static gboolean
@@ -297,6 +399,7 @@
 
       gst_segment_init (&filter->segment, GST_FORMAT_UNDEFINED);
       filter->need_newsegment = TRUE;
+      filter->priv->next_seqnum = -1;
       break;
     case GST_EVENT_NEWSEGMENT:
     {
@@ -316,6 +419,29 @@
       gst_event_unref (event);
       break;
     }
+    case GST_EVENT_CUSTOM_DOWNSTREAM:
+    {
+      GstBaseRTPDepayloadClass *bclass;
+
+      bclass = GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter);
+
+      if (gst_event_has_name (event, "GstRTPPacketLost")) {
+        /* we get this event from the jitterbuffer when it considers a packet as
+         * being lost. We send it to our packet_lost vmethod. The default
+         * implementation will make time progress by pushing out a NEWSEGMENT
+         * update event. Subclasses can override and to one of the following:
+         *  - Adjust timestamp/duration to something more accurate before
+         *    calling the parent (default) packet_lost method.
+         *  - do some more advanced error concealing on the already received
+         *    (fragmented) packets.
+         *  - ignore the packet lost.
+         */
+        if (bclass->packet_lost)
+          res = bclass->packet_lost (filter, event);
+      }
+      gst_event_unref (event);
+      break;
+    }
     default:
       /* pass other events forward */
       res = gst_pad_push_event (filter->srcpad, event);
@@ -324,6 +450,28 @@
   return res;
 }
 
+static GstEvent *
+create_segment_event (GstBaseRTPDepayload * filter, gboolean update,
+    GstClockTime position)
+{
+  GstEvent *event;
+  GstClockTime stop;
+  GstBaseRTPDepayloadPrivate *priv;
+
+  priv = filter->priv;
+
+  if (priv->npt_stop != -1)
+    stop = priv->npt_stop - priv->npt_start;
+  else
+    stop = -1;
+
+  event = gst_event_new_new_segment_full (update, priv->play_speed,
+      priv->play_scale, GST_FORMAT_TIME, position, stop,
+      position + priv->npt_start);
+
+  return event;
+}
+
 static GstFlowReturn
 gst_base_rtp_depayload_push_full (GstBaseRTPDepayload * filter,
     gboolean do_ts, guint32 rtptime, GstBuffer * out_buf)
@@ -337,7 +485,7 @@
 
   /* set the caps if any */
   srccaps = GST_PAD_CAPS (filter->srcpad);
-  if (srccaps)
+  if (G_LIKELY (srccaps))
     gst_buffer_set_caps (out_buf, srccaps);
 
   bclass = GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter);
@@ -346,7 +494,20 @@
   if (bclass->set_gst_timestamp && do_ts)
     bclass->set_gst_timestamp (filter, rtptime, out_buf);
 
-  if (priv->discont) {
+  /* if this is the first buffer send a NEWSEGMENT */
+  if (G_UNLIKELY (filter->need_newsegment)) {
+    GstEvent *event;
+
+    event = create_segment_event (filter, FALSE, 0);
+
+    gst_pad_push_event (filter->srcpad, event);
+
+    filter->need_newsegment = FALSE;
+    GST_DEBUG_OBJECT (filter, "Pushed newsegment event on this first buffer");
+  }
+
+  if (G_UNLIKELY (priv->discont)) {
+    GST_LOG_OBJECT (filter, "Marking DISCONT on output buffer");
     GST_BUFFER_FLAG_SET (out_buf, GST_BUFFER_FLAG_DISCONT);
     priv->discont = FALSE;
   }
@@ -355,8 +516,8 @@
   GST_LOG_OBJECT (filter, "Pushing buffer size %d, timestamp %" GST_TIME_FORMAT,
       GST_BUFFER_SIZE (out_buf),
       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (out_buf)));
+
   ret = gst_pad_push (filter->srcpad, out_buf);
-  GST_LOG_OBJECT (filter, "Pushed buffer: %s", gst_flow_get_name (ret));
 
   return ret;
 }
@@ -410,6 +571,38 @@
   return gst_base_rtp_depayload_push_full (filter, FALSE, 0, out_buf);
 }
 
+/* convert the PacketLost event form a jitterbuffer to a segment update.
+ * subclasses can override this.  */
+static gboolean
+gst_base_rtp_depayload_packet_lost (GstBaseRTPDepayload * filter,
+    GstEvent * event)
+{
+  GstBaseRTPDepayloadPrivate *priv;
+  GstClockTime timestamp, duration, position;
+  GstEvent *sevent;
+  const GstStructure *s;
+
+  priv = filter->priv;
+
+  s = gst_event_get_structure (event);
+
+  /* first start by parsing the timestamp and duration */
+  timestamp = -1;
+  duration = -1;
+
+  gst_structure_get_clock_time (s, "timestamp", &timestamp);
+  gst_structure_get_clock_time (s, "duration", &duration);
+
+  position = timestamp;
+  if (duration != -1)
+    position += duration;
+
+  /* update the current segment with the elapsed time */
+  sevent = create_segment_event (filter, TRUE, position);
+
+  return gst_pad_push_event (filter->srcpad, sevent);
+}
+
 static void
 gst_base_rtp_depayload_set_gst_timestamp (GstBaseRTPDepayload * filter,
     guint32 rtptime, GstBuffer * buf)
@@ -428,28 +621,6 @@
     GST_BUFFER_TIMESTAMP (buf) = priv->timestamp;
   if (!GST_CLOCK_TIME_IS_VALID (duration))
     GST_BUFFER_DURATION (buf) = priv->duration;
-
-  /* if this is the first buffer send a NEWSEGMENT */
-  if (filter->need_newsegment) {
-    GstEvent *event;
-    GstClockTime stop, position;
-
-    if (priv->npt_stop != -1)
-      stop = priv->npt_stop - priv->npt_start;
-    else
-      stop = -1;
-
-    position = priv->npt_start;
-
-    event =
-        gst_event_new_new_segment_full (FALSE, priv->play_speed,
-        priv->play_scale, GST_FORMAT_TIME, 0, stop, position);
-
-    gst_pad_push_event (filter->srcpad, event);
-
-    filter->need_newsegment = FALSE;
-    GST_DEBUG_OBJECT (filter, "Pushed newsegment event on this first buffer");
-  }
 }
 
 static GstStateChangeReturn
@@ -472,6 +643,8 @@
       priv->npt_stop = -1;
       priv->play_speed = 1.0;
       priv->play_scale = 1.0;
+      priv->next_seqnum = -1;
+      priv->negotiated = FALSE;
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
       break;