gst_plugins_base/gst-libs/gst/rtsp/gstrtspconnection.c
author Dremov Kirill (Nokia-D-MSW/Tampere) <kirill.dremov@nokia.com>
Tue, 31 Aug 2010 15:30:33 +0300
branchRCL_3
changeset 29 567bb019e3e3
parent 0 0e761a78d257
child 30 7e817e7e631c
permissions -rw-r--r--
Revision: 201010 Kit: 201035

/* GStreamer
 * Copyright (C) <2005-2009> Wim Taymans <wim.taymans@gmail.com>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */
/*
 * Unless otherwise indicated, Source Code is licensed under MIT license.
 * See further explanation attached in License Statement (distributed in the file
 * LICENSE).
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of
 * this software and associated documentation files (the "Software"), to deal in
 * the Software without restriction, including without limitation the rights to
 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is furnished to do
 * so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

/**
 * SECTION:gstrtspconnection
 * @short_description: manage RTSP connections
 * @see_also: gstrtspurl
 *  
 * <refsect2>
 * <para>
 * This object manages the RTSP connection to the server. It provides function
 * to receive and send bytes and messages.
 * </para>
 * </refsect2>
 *  
 * Last reviewed on 2007-07-24 (0.10.14)
 */

#ifdef HAVE_CONFIG_H
#  include <config.h>
#endif

#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

/* we include this here to get the G_OS_* defines */
#include <glib.h>
#include <gst/gst.h>

#ifdef G_OS_WIN32
/* ws2_32.dll has getaddrinfo and freeaddrinfo on Windows XP and later.
 * minwg32 headers check WINVER before allowing the use of these */
#ifndef WINVER
#define WINVER 0x0501
#endif
#include <winsock2.h>
#include <ws2tcpip.h>
#define EINPROGRESS WSAEINPROGRESS
#else
#include <sys/ioctl.h>
#include <netdb.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <netinet/in.h>
#endif

#ifdef HAVE_FIONREAD_IN_SYS_FILIO
#include <sys/filio.h>
#endif

#include "gstrtspconnection.h"
#include "gstrtspbase64.h"
#include "md5.h"

union gst_sockaddr
{
  struct sockaddr sa;
  struct sockaddr_in sa_in;
  struct sockaddr_in6 sa_in6;
  struct sockaddr_storage sa_stor;
};

typedef struct
{
  gint state;
  guint save;
  guchar out[3];                /* the size must be evenly divisible by 3 */
  guint cout;
  guint coutl;
} DecodeCtx;

static GstRTSPResult read_line (gint fd, guint8 * buffer, guint * idx,
    guint size, DecodeCtx * ctxp);
static GstRTSPResult parse_key_value (guint8 * buffer, gchar * key,
    guint keysize, gchar ** value);
static void parse_string (gchar * dest, gint size, gchar ** src);

#ifdef G_OS_WIN32
#define READ_SOCKET(fd, buf, len) recv (fd, (char *)buf, len, 0)
#define WRITE_SOCKET(fd, buf, len) send (fd, (const char *)buf, len, 0)
#define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, (const char *)val, len)
#define CLOSE_SOCKET(sock) closesocket (sock)
#define ERRNO_IS_EAGAIN (WSAGetLastError () == WSAEWOULDBLOCK)
#define ERRNO_IS_EINTR (WSAGetLastError () == WSAEINTR)
/* According to Microsoft's connect() documentation this one returns
 * WSAEWOULDBLOCK and not WSAEINPROGRESS. */
#define ERRNO_IS_EINPROGRESS (WSAGetLastError () == WSAEWOULDBLOCK)
#else
#define READ_SOCKET(fd, buf, len) read (fd, buf, len)
#define WRITE_SOCKET(fd, buf, len) write (fd, buf, len)
#define SETSOCKOPT(sock, level, name, val, len) setsockopt (sock, level, name, val, len)
#define CLOSE_SOCKET(sock) close (sock)
#define ERRNO_IS_EAGAIN (errno == EAGAIN)
#define ERRNO_IS_EINTR (errno == EINTR)
#define ERRNO_IS_EINPROGRESS (errno == EINPROGRESS)
#endif

#define ADD_POLLFD(fdset, pfd, fd)        \
G_STMT_START {                            \
  (pfd)->fd = fd;                         \
  gst_poll_add_fd (fdset, pfd);           \
} G_STMT_END

#define REMOVE_POLLFD(fdset, pfd)          \
G_STMT_START {                             \
  if ((pfd)->fd != -1) {                   \
    GST_DEBUG ("remove fd %d", (pfd)->fd); \
    gst_poll_remove_fd (fdset, pfd);       \
    CLOSE_SOCKET ((pfd)->fd);              \
    (pfd)->fd = -1;                        \
  }                                        \
} G_STMT_END

typedef enum
{
  TUNNEL_STATE_NONE,
  TUNNEL_STATE_GET,
  TUNNEL_STATE_POST,
  TUNNEL_STATE_COMPLETE
} GstRTSPTunnelState;

#define TUNNELID_LEN   24

struct _GstRTSPConnection
{
  /*< private > */
  /* URL for the connection */
  GstRTSPUrl *url;

  /* connection state */
  GstPollFD fd0;
  GstPollFD fd1;

  GstPollFD *readfd;
  GstPollFD *writefd;

  gchar tunnelid[TUNNELID_LEN];
  gboolean tunneled;
  GstRTSPTunnelState tstate;

  GstPoll *fdset;
  gchar *ip;

  /* Session state */
  gint cseq;                    /* sequence number */
  gchar session_id[512];        /* session id */
  gint timeout;                 /* session timeout in seconds */
  GTimer *timer;                /* timeout timer */

  /* Authentication */
  GstRTSPAuthMethod auth_method;
  gchar *username;
  gchar *passwd;
  GHashTable *auth_params;

  DecodeCtx ctx;
  DecodeCtx *ctxp;

  gchar *proxy_host;
  guint proxy_port;
};

enum
{
  STATE_START = 0,
  STATE_DATA_HEADER,
  STATE_DATA_BODY,
  STATE_READ_LINES,
  STATE_END,
  STATE_LAST
};

/* a structure for constructing RTSPMessages */
typedef struct
{
  gint state;
  guint8 buffer[4096];
  guint offset;

  guint line;
  guint8 *body_data;
  glong body_len;
} GstRTSPBuilder;

static void
build_reset (GstRTSPBuilder * builder)
{
  g_free (builder->body_data);
  memset (builder, 0, sizeof (GstRTSPBuilder));
}

/**
 * gst_rtsp_connection_create:
 * @url: a #GstRTSPUrl 
 * @conn: storage for a #GstRTSPConnection
 *
 * Create a newly allocated #GstRTSPConnection from @url and store it in @conn.
 * The connection will not yet attempt to connect to @url, use
 * gst_rtsp_connection_connect().
 *
 * A copy of @url will be made.
 *
 * Returns: #GST_RTSP_OK when @conn contains a valid connection.
 */
GstRTSPResult
gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn)
{
  GstRTSPConnection *newconn;
#ifdef G_OS_WIN32
  WSADATA w;
  int error;
#endif

  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);

#ifdef G_OS_WIN32
  error = WSAStartup (0x0202, &w);

  if (error)
    goto startup_error;

  if (w.wVersion != 0x0202)
    goto version_error;
#endif

  newconn = g_new0 (GstRTSPConnection, 1);

  if ((newconn->fdset = gst_poll_new (TRUE)) == NULL)
    goto no_fdset;

  newconn->url = gst_rtsp_url_copy (url);
  newconn->fd0.fd = -1;
  newconn->fd1.fd = -1;
  newconn->timer = g_timer_new ();
  newconn->timeout = 60;
  newconn->cseq = 1;

  newconn->auth_method = GST_RTSP_AUTH_NONE;
  newconn->username = NULL;
  newconn->passwd = NULL;
  newconn->auth_params = NULL;

  *conn = newconn;

  return GST_RTSP_OK;

  /* ERRORS */
#ifdef G_OS_WIN32
startup_error:
  {
    g_warning ("Error %d on WSAStartup", error);
    return GST_RTSP_EWSASTART;
  }
version_error:
  {
    g_warning ("Windows sockets are not version 0x202 (current 0x%x)",
        w.wVersion);
    WSACleanup ();
    return GST_RTSP_EWSAVERSION;
  }
#endif
no_fdset:
  {
    g_free (newconn);
#ifdef G_OS_WIN32
    WSACleanup ();
#endif
    return GST_RTSP_ESYS;
  }
}

/**
 * gst_rtsp_connection_accept:
 * @sock: a socket
 * @conn: storage for a #GstRTSPConnection
 *
 * Accept a new connection on @sock and create a new #GstRTSPConnection for
 * handling communication on new socket.
 *
 * Returns: #GST_RTSP_OK when @conn contains a valid connection.
 *
 * Since: 0.10.23
 */
GstRTSPResult
gst_rtsp_connection_accept (gint sock, GstRTSPConnection ** conn)
{
  int fd;
  GstRTSPConnection *newconn = NULL;
  union gst_sockaddr sa;
  socklen_t slen = sizeof (sa);
  gchar ip[INET6_ADDRSTRLEN];
  GstRTSPUrl *url;
#ifdef G_OS_WIN32
  gulong flags = 1;
#endif

  memset (&sa, 0, slen);

#ifndef G_OS_WIN32
  fd = accept (sock, &sa.sa, &slen);
#else
  fd = accept (sock, &sa.sa, (gint *) & slen);
#endif /* G_OS_WIN32 */
  if (fd == -1)
    goto accept_failed;

  if (getnameinfo (&sa.sa, slen, ip, sizeof (ip), NULL, 0, NI_NUMERICHOST) != 0)
    goto getnameinfo_failed;
  if (sa.sa.sa_family != AF_INET && sa.sa.sa_family != AF_INET6)
    goto wrong_family;

  /* set to non-blocking mode so that we can cancel the communication */
#ifndef G_OS_WIN32
  fcntl (fd, F_SETFL, O_NONBLOCK);
#else
  ioctlsocket (fd, FIONBIO, &flags);
#endif /* G_OS_WIN32 */

  /* create a url for the client address */
  url = g_new0 (GstRTSPUrl, 1);
  url->host = g_strdup (ip);
  if (sa.sa.sa_family == AF_INET)
    url->port = sa.sa_in.sin_port;
  else
    url->port = sa.sa_in6.sin6_port;

  /* now create the connection object */
  gst_rtsp_connection_create (url, &newconn);
  gst_rtsp_url_free (url);

  ADD_POLLFD (newconn->fdset, &newconn->fd0, fd);

  /* both read and write initially */
  newconn->readfd = &newconn->fd0;
  newconn->writefd = &newconn->fd0;

  *conn = newconn;

  return GST_RTSP_OK;

  /* ERRORS */
accept_failed:
  {
    return GST_RTSP_ESYS;
  }
getnameinfo_failed:
wrong_family:
  {
    close (fd);
    return GST_RTSP_ERROR;
  }
}

static gchar *
do_resolve (const gchar * host)
{
  static gchar ip[INET6_ADDRSTRLEN];
  struct addrinfo *aires;
  struct addrinfo *ai;
  gint aierr;

  aierr = getaddrinfo (host, NULL, NULL, &aires);
  if (aierr != 0)
    goto no_addrinfo;

  for (ai = aires; ai; ai = ai->ai_next) {
    if (ai->ai_family == AF_INET || ai->ai_family == AF_INET6) {
      break;
    }
  }
  if (ai == NULL)
    goto no_family;

  aierr = getnameinfo (ai->ai_addr, ai->ai_addrlen, ip, sizeof (ip), NULL, 0,
      NI_NUMERICHOST | NI_NUMERICSERV);
  if (aierr != 0)
    goto no_address;

  freeaddrinfo (aires);

  return g_strdup (ip);

  /* ERRORS */
no_addrinfo:
  {
    GST_ERROR ("no addrinfo found for %s: %s", host, gai_strerror (aierr));
    return NULL;
  }
no_family:
  {
    GST_ERROR ("no family found for %s", host);
    freeaddrinfo (aires);
    return NULL;
  }
no_address:
  {
    GST_ERROR ("no address found for %s: %s", host, gai_strerror (aierr));
    freeaddrinfo (aires);
    return NULL;
  }
}

static GstRTSPResult
do_connect (const gchar * ip, guint16 port, GstPollFD * fdout,
    GstPoll * fdset, GTimeVal * timeout)
{
  gint fd;
  struct addrinfo hints;
  struct addrinfo *aires;
  struct addrinfo *ai;
  gint aierr;
  gchar service[NI_MAXSERV];
  gint ret;
#ifdef G_OS_WIN32
  unsigned long flags = 1;
#endif /* G_OS_WIN32 */
  GstClockTime to;
  gint retval;

  memset (&hints, 0, sizeof hints);
  hints.ai_flags = AI_NUMERICHOST;
  hints.ai_family = AF_UNSPEC;
  hints.ai_socktype = SOCK_STREAM;
  g_snprintf (service, sizeof (service) - 1, "%hu", port);
  service[sizeof (service) - 1] = '\0';

  aierr = getaddrinfo (ip, service, &hints, &aires);
  if (aierr != 0)
    goto no_addrinfo;

  for (ai = aires; ai; ai = ai->ai_next) {
    if (ai->ai_family == AF_INET || ai->ai_family == AF_INET6) {
      break;
    }
  }
  if (ai == NULL)
    goto no_family;

  fd = socket (ai->ai_family, SOCK_STREAM, 0);
  if (fd == -1)
    goto no_socket;

  /* set to non-blocking mode so that we can cancel the connect */
#ifndef G_OS_WIN32
  fcntl (fd, F_SETFL, O_NONBLOCK);
#else
  ioctlsocket (fd, FIONBIO, &flags);
#endif /* G_OS_WIN32 */

  /* add the socket to our fdset */
  ADD_POLLFD (fdset, fdout, fd);

  /* we are going to connect ASYNC now */
  ret = connect (fd, ai->ai_addr, ai->ai_addrlen);
  if (ret == 0)
    goto done;
  if (!ERRNO_IS_EINPROGRESS)
    goto sys_error;

  /* wait for connect to complete up to the specified timeout or until we got
   * interrupted. */
  gst_poll_fd_ctl_write (fdset, fdout, TRUE);

  to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;

  do {
    retval = gst_poll_wait (fdset, to);
  } while (retval == -1 && (errno == EINTR || errno == EAGAIN));

  if (retval == 0)
    goto timeout;
  else if (retval == -1)
    goto sys_error;

  /* we can still have an error connecting on windows */
  if (gst_poll_fd_has_error (fdset, fdout)) {
    socklen_t len = sizeof (errno);
#ifndef G_OS_WIN32
    getsockopt (fd, SOL_SOCKET, SO_ERROR, &errno, &len);
#else
    getsockopt (fd, SOL_SOCKET, SO_ERROR, (char *) &errno, &len);
#endif
    goto sys_error;
  }

  gst_poll_fd_ignored (fdset, fdout);

done:
  freeaddrinfo (aires);

  return GST_RTSP_OK;

  /* ERRORS */
no_addrinfo:
  {
    GST_ERROR ("no addrinfo found for %s: %s", ip, gai_strerror (aierr));
    return GST_RTSP_ERROR;
  }
no_family:
  {
    GST_ERROR ("no family found for %s", ip);
    freeaddrinfo (aires);
    return GST_RTSP_ERROR;
  }
no_socket:
  {
    GST_ERROR ("no socket %d (%s)", errno, g_strerror (errno));
    freeaddrinfo (aires);
    return GST_RTSP_ESYS;
  }
sys_error:
  {
    GST_ERROR ("system error %d (%s)", errno, g_strerror (errno));
    REMOVE_POLLFD (fdset, fdout);
    freeaddrinfo (aires);
    return GST_RTSP_ESYS;
  }
timeout:
  {
    GST_ERROR ("timeout");
    REMOVE_POLLFD (fdset, fdout);
    freeaddrinfo (aires);
    return GST_RTSP_ETIMEOUT;
  }
}

static GstRTSPResult
setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout)
{
  gint i;
  GstRTSPResult res;
  gchar *str;
  guint idx, line;
  gint retval;
  GstClockTime to;
  gchar *ip, *url_port_str;
  guint16 port, url_port;
  gchar codestr[4], *resultstr;
  gint code;
  GstRTSPUrl *url;
  gchar *hostparam;

  /* create a random sessionid */
  for (i = 0; i < TUNNELID_LEN; i++)
    conn->tunnelid[i] = g_random_int_range ('a', 'z');
  conn->tunnelid[TUNNELID_LEN - 1] = '\0';

  url = conn->url;
  /* get the port from the url */
  gst_rtsp_url_get_port (url, &url_port);

  if (conn->proxy_host) {
    hostparam = g_strdup_printf ("Host: %s:%d\r\n", url->host, url_port);
    url_port_str = g_strdup_printf (":%d", url_port);
    ip = conn->proxy_host;
    port = conn->proxy_port;
  } else {
    hostparam = NULL;
    url_port_str = NULL;
    ip = conn->ip;
    port = url_port;
  }

  /* */
  str = g_strdup_printf ("GET %s%s%s%s%s%s HTTP/1.0\r\n"
      "%s"
      "x-sessioncookie: %s\r\n"
      "Accept: application/x-rtsp-tunnelled\r\n"
      "Pragma: no-cache\r\n"
      "Cache-Control: no-cache\r\n" "\r\n",
      conn->proxy_host ? "http://" : "",
      conn->proxy_host ? url->host : "",
      conn->proxy_host ? url_port_str : "",
      url->abspath, url->query ? "?" : "", url->query ? url->query : "",
      hostparam ? hostparam : "", conn->tunnelid);

  /* we start by writing to this fd */
  conn->writefd = &conn->fd0;

  res = gst_rtsp_connection_write (conn, (guint8 *) str, strlen (str), timeout);
  g_free (str);
  if (res != GST_RTSP_OK)
    goto write_failed;

  gst_poll_fd_ctl_write (conn->fdset, &conn->fd0, FALSE);
  gst_poll_fd_ctl_read (conn->fdset, &conn->fd0, TRUE);

  to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;

  line = 0;
  while (TRUE) {
    guint8 buffer[4096];

    idx = 0;
    while (TRUE) {
      res = read_line (conn->fd0.fd, buffer, &idx, sizeof (buffer), NULL);
      if (res == GST_RTSP_EEOF)
        goto eof;
      if (res == GST_RTSP_OK)
        break;
      if (res != GST_RTSP_EINTR)
        goto read_error;

      do {
        retval = gst_poll_wait (conn->fdset, to);
      } while (retval == -1 && (errno == EINTR || errno == EAGAIN));

      /* check for timeout */
      if (retval == 0)
        goto timeout;

      if (retval == -1) {
        if (errno == EBUSY)
          goto stopped;
        else
          goto select_error;
      }
    }

    /* check for last line */
    if (buffer[0] == '\r')
      buffer[0] = '\0';
    if (buffer[0] == '\0')
      break;

    if (line == 0) {
      /* first line, parse response */
      gchar versionstr[20];
      gchar *bptr;

      bptr = (gchar *) buffer;

      parse_string (versionstr, sizeof (versionstr), &bptr);
      parse_string (codestr, sizeof (codestr), &bptr);
      code = atoi (codestr);

      while (g_ascii_isspace (*bptr))
        bptr++;

      resultstr = bptr;

      if (code != GST_RTSP_STS_OK)
        goto wrong_result;
    } else {
      gchar key[32];
      gchar *value;

      /* other lines, parse key/value */
      res = parse_key_value (buffer, key, sizeof (key), &value);
      if (res == GST_RTSP_OK) {
        /* we got a new ip address */
        if (g_ascii_strcasecmp (key, "x-server-ip-address") == 0) {
          if (conn->proxy_host) {
            /* if we use a proxy we need to change the destination url */
            g_free (url->host);
            url->host = g_strdup (value);
            g_free (hostparam);
            g_free (url_port_str);
            hostparam =
                g_strdup_printf ("Host: %s:%d\r\n", url->host, url_port);
            url_port_str = g_strdup_printf (":%d", url_port);
          } else {
            /* and resolve the new ip address */
            if (!(ip = do_resolve (conn->ip)))
              goto not_resolved;
            g_free (conn->ip);
            conn->ip = ip;
          }
        }
      }
    }
    line++;
  }

  /* connect to the host/port */
  res = do_connect (ip, port, &conn->fd1, conn->fdset, timeout);
  if (res != GST_RTSP_OK)
    goto connect_failed;

  /* this is now our writing socket */
  conn->writefd = &conn->fd1;

  /* */
  str = g_strdup_printf ("POST %s%s%s%s%s%s HTTP/1.0\r\n"
      "%s"
      "x-sessioncookie: %s\r\n"
      "Content-Type: application/x-rtsp-tunnelled\r\n"
      "Pragma: no-cache\r\n"
      "Cache-Control: no-cache\r\n"
      "Content-Length: 32767\r\n"
      "Expires: Sun, 9 Jan 1972 00:00:00 GMT\r\n"
      "\r\n",
      conn->proxy_host ? "http://" : "",
      conn->proxy_host ? url->host : "",
      conn->proxy_host ? url_port_str : "",
      url->abspath, url->query ? "?" : "", url->query ? url->query : "",
      hostparam ? hostparam : "", conn->tunnelid);

  res = gst_rtsp_connection_write (conn, (guint8 *) str, strlen (str), timeout);
  g_free (str);
  if (res != GST_RTSP_OK)
    goto write_failed;

exit:
  g_free (hostparam);
  g_free (url_port_str);

  return res;

  /* ERRORS */
write_failed:
  {
    GST_ERROR ("write failed (%d)", res);
    goto exit;
  }
eof:
  {
    res = GST_RTSP_EEOF;
    goto exit;
  }
read_error:
  {
    goto exit;
  }
timeout:
  {
    res = GST_RTSP_ETIMEOUT;
    goto exit;
  }
select_error:
  {
    res = GST_RTSP_ESYS;
    goto exit;
  }
stopped:
  {
    res = GST_RTSP_EINTR;
    goto exit;
  }
wrong_result:
  {
    GST_ERROR ("got failure response %d %s", code, resultstr);
    res = GST_RTSP_ERROR;
    goto exit;
  }
not_resolved:
  {
    GST_ERROR ("could not resolve %s", conn->ip);
    res = GST_RTSP_ENET;
    goto exit;
  }
connect_failed:
  {
    GST_ERROR ("failed to connect");
    goto exit;
  }
}

/**
 * gst_rtsp_connection_connect:
 * @conn: a #GstRTSPConnection 
 * @timeout: a #GTimeVal timeout
 *
 * Attempt to connect to the url of @conn made with
 * gst_rtsp_connection_create(). If @timeout is #NULL this function can block
 * forever. If @timeout contains a valid timeout, this function will return
 * #GST_RTSP_ETIMEOUT after the timeout expired.
 *
 * This function can be cancelled with gst_rtsp_connection_flush().
 *
 * Returns: #GST_RTSP_OK when a connection could be made.
 */
GstRTSPResult
gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
{
  GstRTSPResult res;
  gchar *ip;
  guint16 port;
  GstRTSPUrl *url;

  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (conn->fd0.fd < 0, GST_RTSP_EINVAL);

  url = conn->url;

  if (conn->proxy_host && conn->tunneled) {
    if (!(ip = do_resolve (conn->proxy_host))) {
      GST_ERROR ("could not resolve %s", conn->proxy_host);
      goto not_resolved;
    }
    port = conn->proxy_port;
    g_free (conn->proxy_host);
    conn->proxy_host = ip;
  } else {
    if (!(ip = do_resolve (url->host))) {
      GST_ERROR ("could not resolve %s", url->host);
      goto not_resolved;
    }
    /* get the port from the url */
    gst_rtsp_url_get_port (url, &port);

    g_free (conn->ip);
    conn->ip = ip;
  }

  /* connect to the host/port */
  res = do_connect (ip, port, &conn->fd0, conn->fdset, timeout);
  if (res != GST_RTSP_OK)
    goto connect_failed;

  /* this is our read URL */
  conn->readfd = &conn->fd0;

  if (conn->tunneled) {
    res = setup_tunneling (conn, timeout);
    if (res != GST_RTSP_OK)
      goto tunneling_failed;
  } else {
    conn->writefd = &conn->fd0;
  }

  return GST_RTSP_OK;

not_resolved:
  {
    return GST_RTSP_ENET;
  }
connect_failed:
  {
    GST_ERROR ("failed to connect");
    return res;
  }
tunneling_failed:
  {
    GST_ERROR ("failed to setup tunneling");
    return res;
  }
}

static void
md5_digest_to_hex_string (unsigned char digest[16], char string[33])
{
  static const char hexdigits[] = "0123456789abcdef";
  int i;

  for (i = 0; i < 16; i++) {
    string[i * 2] = hexdigits[(digest[i] >> 4) & 0x0f];
    string[i * 2 + 1] = hexdigits[digest[i] & 0x0f];
  }
  string[32] = 0;
}

static void
auth_digest_compute_hex_urp (const gchar * username,
    const gchar * realm, const gchar * password, gchar hex_urp[33])
{
  struct MD5Context md5_context;
  unsigned char digest[16];

  MD5Init (&md5_context);
  MD5Update (&md5_context, username, strlen (username));
  MD5Update (&md5_context, ":", 1);
  MD5Update (&md5_context, realm, strlen (realm));
  MD5Update (&md5_context, ":", 1);
  MD5Update (&md5_context, password, strlen (password));
  MD5Final (digest, &md5_context);
  md5_digest_to_hex_string (digest, hex_urp);
}

static void
auth_digest_compute_response (const gchar * method,
    const gchar * uri, const gchar * hex_a1, const gchar * nonce,
    gchar response[33])
{
  char hex_a2[33];
  struct MD5Context md5_context;
  unsigned char digest[16];

  /* compute A2 */
  MD5Init (&md5_context);
  MD5Update (&md5_context, method, strlen (method));
  MD5Update (&md5_context, ":", 1);
  MD5Update (&md5_context, uri, strlen (uri));
  MD5Final (digest, &md5_context);
  md5_digest_to_hex_string (digest, hex_a2);

  /* compute KD */
  MD5Init (&md5_context);
  MD5Update (&md5_context, hex_a1, strlen (hex_a1));
  MD5Update (&md5_context, ":", 1);
  MD5Update (&md5_context, nonce, strlen (nonce));
  MD5Update (&md5_context, ":", 1);

  MD5Update (&md5_context, hex_a2, 32);
  MD5Final (digest, &md5_context);
  md5_digest_to_hex_string (digest, response);
}

static void
add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message)
{
  switch (conn->auth_method) {
    case GST_RTSP_AUTH_BASIC:{
      gchar *user_pass;
      gchar *user_pass64;
      gchar *auth_string;

      user_pass = g_strdup_printf ("%s:%s", conn->username, conn->passwd);
      user_pass64 = g_base64_encode ((guchar *) user_pass, strlen (user_pass));
      auth_string = g_strdup_printf ("Basic %s", user_pass64);

      gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
          auth_string);

      g_free (user_pass);
      g_free (user_pass64);
      break;
    }
    case GST_RTSP_AUTH_DIGEST:{
      gchar response[33], hex_urp[33];
      gchar *auth_string, *auth_string2;
      gchar *realm;
      gchar *nonce;
      gchar *opaque;
      const gchar *uri;
      const gchar *method;

      /* we need to have some params set */
      if (conn->auth_params == NULL)
        break;

      /* we need the realm and nonce */
      realm = (gchar *) g_hash_table_lookup (conn->auth_params, "realm");
      nonce = (gchar *) g_hash_table_lookup (conn->auth_params, "nonce");
      if (realm == NULL || nonce == NULL)
        break;

      auth_digest_compute_hex_urp (conn->username, realm, conn->passwd,
          hex_urp);

      method = gst_rtsp_method_as_text (message->type_data.request.method);
      uri = message->type_data.request.uri;

      /* Assume no qop, algorithm=md5, stale=false */
      /* For algorithm MD5, a1 = urp. */
      auth_digest_compute_response (method, uri, hex_urp, nonce, response);
      auth_string = g_strdup_printf ("Digest username=\"%s\", "
          "realm=\"%s\", nonce=\"%s\", uri=\"%s\", response=\"%s\"",
          conn->username, realm, nonce, uri, response);

      opaque = (gchar *) g_hash_table_lookup (conn->auth_params, "opaque");
      if (opaque) {
        auth_string2 = g_strdup_printf ("%s, opaque=\"%s\"", auth_string,
            opaque);
        g_free (auth_string);
        auth_string = auth_string2;
      }
      gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION,
          auth_string);
      break;
    }
    default:
      /* Nothing to do */
      break;
  }
}

static void
gen_date_string (gchar * date_string, guint len)
{
  GTimeVal tv;
  time_t t;
#ifdef HAVE_GMTIME_R
  struct tm tm_;
#endif

  g_get_current_time (&tv);
  t = (time_t) tv.tv_sec;

#ifdef HAVE_GMTIME_R
  strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime_r (&t, &tm_));
#else
  strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime (&t));
#endif
}

static GstRTSPResult
write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size)
{
  guint left;

  if (G_UNLIKELY (*idx > size))
    return GST_RTSP_ERROR;

  left = size - *idx;

  while (left) {
    gint r;

    r = WRITE_SOCKET (fd, &buffer[*idx], left);
    if (G_UNLIKELY (r == 0)) {
      return GST_RTSP_EINTR;
    } else if (G_UNLIKELY (r < 0)) {
      if (ERRNO_IS_EAGAIN)
        return GST_RTSP_EINTR;
      if (!ERRNO_IS_EINTR)
        return GST_RTSP_ESYS;
    } else {
      left -= r;
      *idx += r;
    }
  }
  return GST_RTSP_OK;
}

static gint
fill_bytes (gint fd, guint8 * buffer, guint size, DecodeCtx * ctx)
{
  gint out = 0;

  if (ctx) {
    while (size > 0) {
      guint8 in[sizeof (ctx->out) * 4 / 3];
      gint r;

      while (size > 0 && ctx->cout < ctx->coutl) {
        /* we have some leftover bytes */
        *buffer++ = ctx->out[ctx->cout++];
        size--;
        out++;
      }

      /* got what we needed? */
      if (size == 0)
        break;

      /* try to read more bytes */
      r = READ_SOCKET (fd, in, sizeof (in));
      if (r <= 0) {
        if (out == 0)
          out = r;
        break;
      }

      ctx->cout = 0;
      ctx->coutl =
          g_base64_decode_step ((gchar *) in, r, ctx->out, &ctx->state,
          &ctx->save);
    }
  } else {
    out = READ_SOCKET (fd, buffer, size);
  }

  return out;
}

static GstRTSPResult
read_bytes (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx)
{
  guint left;

  if (G_UNLIKELY (*idx > size))
    return GST_RTSP_ERROR;

  left = size - *idx;

  while (left) {
    gint r;

    r = fill_bytes (fd, &buffer[*idx], left, ctx);
    if (G_UNLIKELY (r == 0)) {
      return GST_RTSP_EEOF;
    } else if (G_UNLIKELY (r < 0)) {
      if (ERRNO_IS_EAGAIN)
        return GST_RTSP_EINTR;
      if (!ERRNO_IS_EINTR)
        return GST_RTSP_ESYS;
    } else {
      left -= r;
      *idx += r;
    }
  }
  return GST_RTSP_OK;
}

static GstRTSPResult
read_line (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx)
{
  while (TRUE) {
    guint8 c;
    gint r;

    r = fill_bytes (fd, &c, 1, ctx);
    if (G_UNLIKELY (r == 0)) {
      return GST_RTSP_EEOF;
    } else if (G_UNLIKELY (r < 0)) {
      if (ERRNO_IS_EAGAIN)
        return GST_RTSP_EINTR;
      if (!ERRNO_IS_EINTR)
        return GST_RTSP_ESYS;
    } else {
      if (c == '\n')            /* end on \n */
        break;
      if (c == '\r')            /* ignore \r */
        continue;

      if (G_LIKELY (*idx < size - 1))
        buffer[(*idx)++] = c;
    }
  }
  buffer[*idx] = '\0';

  return GST_RTSP_OK;
}

/**
 * gst_rtsp_connection_write:
 * @conn: a #GstRTSPConnection
 * @data: the data to write
 * @size: the size of @data
 * @timeout: a timeout value or #NULL
 *
 * Attempt to write @size bytes of @data to the connected @conn, blocking up to
 * the specified @timeout. @timeout can be #NULL, in which case this function
 * might block forever.
 * 
 * This function can be cancelled with gst_rtsp_connection_flush().
 *
 * Returns: #GST_RTSP_OK on success.
 */
GstRTSPResult
gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
    guint size, GTimeVal * timeout)
{
  guint offset;
  gint retval;
  GstClockTime to;
  GstRTSPResult res;

  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
  g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);

  gst_poll_set_controllable (conn->fdset, TRUE);
  gst_poll_fd_ctl_write (conn->fdset, conn->writefd, TRUE);
  gst_poll_fd_ctl_read (conn->fdset, conn->readfd, FALSE);
  /* clear all previous poll results */
  gst_poll_fd_ignored (conn->fdset, conn->writefd);
  gst_poll_fd_ignored (conn->fdset, conn->readfd);

  to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;

  offset = 0;

  while (TRUE) {
    /* try to write */
    res = write_bytes (conn->writefd->fd, data, &offset, size);
    if (G_LIKELY (res == GST_RTSP_OK))
      break;
    if (G_UNLIKELY (res != GST_RTSP_EINTR))
      goto write_error;

    /* not all is written, wait until we can write more */
    do {
      retval = gst_poll_wait (conn->fdset, to);
    } while (retval == -1 && (errno == EINTR || errno == EAGAIN));

    if (G_UNLIKELY (retval == 0))
      goto timeout;

    if (G_UNLIKELY (retval == -1)) {
      if (errno == EBUSY)
        goto stopped;
      else
        goto select_error;
    }
  }
  return GST_RTSP_OK;

  /* ERRORS */
timeout:
  {
    return GST_RTSP_ETIMEOUT;
  }
select_error:
  {
    return GST_RTSP_ESYS;
  }
stopped:
  {
    return GST_RTSP_EINTR;
  }
write_error:
  {
    return res;
  }
}

static GString *
message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
{
  GString *str = NULL;

  str = g_string_new ("");

  switch (message->type) {
    case GST_RTSP_MESSAGE_REQUEST:
      /* create request string, add CSeq */
      g_string_append_printf (str, "%s %s RTSP/1.0\r\n"
          "CSeq: %d\r\n",
          gst_rtsp_method_as_text (message->type_data.request.method),
          message->type_data.request.uri, conn->cseq++);
      /* add session id if we have one */
      if (conn->session_id[0] != '\0') {
        gst_rtsp_message_add_header (message, GST_RTSP_HDR_SESSION,
            conn->session_id);
      }
      /* add any authentication headers */
      add_auth_header (conn, message);
      break;
    case GST_RTSP_MESSAGE_RESPONSE:
      /* create response string */
      g_string_append_printf (str, "RTSP/1.0 %d %s\r\n",
          message->type_data.response.code, message->type_data.response.reason);
      break;
    case GST_RTSP_MESSAGE_DATA:
    {
      guint8 data_header[4];

      /* prepare data header */
      data_header[0] = '$';
      data_header[1] = message->type_data.data.channel;
      data_header[2] = (message->body_size >> 8) & 0xff;
      data_header[3] = message->body_size & 0xff;

      /* create string with header and data */
      str = g_string_append_len (str, (gchar *) data_header, 4);
      str =
          g_string_append_len (str, (gchar *) message->body,
          message->body_size);
      break;
    }
    default:
      g_string_free (str, TRUE);
      g_return_val_if_reached (NULL);
      break;
  }

  /* append headers and body */
  if (message->type != GST_RTSP_MESSAGE_DATA) {
    gchar date_string[100];

    gen_date_string (date_string, sizeof (date_string));

    /* add date header */
    gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string);

    /* append headers */
    gst_rtsp_message_append_headers (message, str);

    /* append Content-Length and body if needed */
    if (message->body != NULL && message->body_size > 0) {
      gchar *len;

      len = g_strdup_printf ("%d", message->body_size);
      g_string_append_printf (str, "%s: %s\r\n",
          gst_rtsp_header_as_text (GST_RTSP_HDR_CONTENT_LENGTH), len);
      g_free (len);
      /* header ends here */
      g_string_append (str, "\r\n");
      str =
          g_string_append_len (str, (gchar *) message->body,
          message->body_size);
    } else {
      /* just end headers */
      g_string_append (str, "\r\n");
    }
  }

  return str;
}

/**
 * gst_rtsp_connection_send:
 * @conn: a #GstRTSPConnection
 * @message: the message to send
 * @timeout: a timeout value or #NULL
 *
 * Attempt to send @message to the connected @conn, blocking up to
 * the specified @timeout. @timeout can be #NULL, in which case this function
 * might block forever.
 * 
 * This function can be cancelled with gst_rtsp_connection_flush().
 *
 * Returns: #GST_RTSP_OK on success.
 */
GstRTSPResult
gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
    GTimeVal * timeout)
{
  GString *string = NULL;
  GstRTSPResult res;
  gchar *str;
  gsize len;

  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);

  if (G_UNLIKELY (!(string = message_to_string (conn, message))))
    goto no_message;

  if (conn->tunneled) {
    str = g_base64_encode ((const guchar *) string->str, string->len);
    g_string_free (string, TRUE);
    len = strlen (str);
  } else {
    str = string->str;
    len = string->len;
    g_string_free (string, FALSE);
  }

  /* write request */
  res = gst_rtsp_connection_write (conn, (guint8 *) str, len, timeout);

  g_free (str);

  return res;

no_message:
  {
    g_warning ("Wrong message");
    return GST_RTSP_EINVAL;
  }
}

static void
parse_string (gchar * dest, gint size, gchar ** src)
{
  gint idx;

  idx = 0;
  /* skip spaces */
  while (g_ascii_isspace (**src))
    (*src)++;

  while (!g_ascii_isspace (**src) && **src != '\0') {
    if (idx < size - 1)
      dest[idx++] = **src;
    (*src)++;
  }
  if (size > 0)
    dest[idx] = '\0';
}

static void
parse_key (gchar * dest, gint size, gchar ** src)
{
  gint idx;

  idx = 0;
  while (**src != ':' && **src != '\0') {
    if (idx < size - 1)
      dest[idx++] = **src;
    (*src)++;
  }
  if (size > 0)
    dest[idx] = '\0';
}

static GstRTSPResult
parse_response_status (guint8 * buffer, GstRTSPMessage * msg)
{
  GstRTSPResult res;
  gchar versionstr[20];
  gchar codestr[4];
  gint code;
  gchar *bptr;

  bptr = (gchar *) buffer;

  parse_string (versionstr, sizeof (versionstr), &bptr);
  parse_string (codestr, sizeof (codestr), &bptr);
  code = atoi (codestr);

  while (g_ascii_isspace (*bptr))
    bptr++;

  if (strcmp (versionstr, "RTSP/1.0") == 0)
    GST_RTSP_CHECK (gst_rtsp_message_init_response (msg, code, bptr, NULL),
        parse_error);
  else if (strncmp (versionstr, "RTSP/", 5) == 0) {
    GST_RTSP_CHECK (gst_rtsp_message_init_response (msg, code, bptr, NULL),
        parse_error);
    msg->type_data.response.version = GST_RTSP_VERSION_INVALID;
  } else
    goto parse_error;

  return GST_RTSP_OK;

parse_error:
  {
    return GST_RTSP_EPARSE;
  }
}

static GstRTSPResult
parse_request_line (GstRTSPConnection * conn, guint8 * buffer,
    GstRTSPMessage * msg)
{
  GstRTSPResult res = GST_RTSP_OK;
  gchar versionstr[20];
  gchar methodstr[20];
  gchar urlstr[4096];
  gchar *bptr;
  GstRTSPMethod method;
  GstRTSPTunnelState tstate = TUNNEL_STATE_NONE;

  bptr = (gchar *) buffer;

  parse_string (methodstr, sizeof (methodstr), &bptr);
  method = gst_rtsp_find_method (methodstr);
  if (method == GST_RTSP_INVALID) {
    /* a tunnel request is allowed when we don't have one yet */
    if (conn->tstate != TUNNEL_STATE_NONE)
      goto invalid_method;
    /* we need GET or POST for a valid tunnel request */
    if (!strcmp (methodstr, "GET"))
      tstate = TUNNEL_STATE_GET;
    else if (!strcmp (methodstr, "POST"))
      tstate = TUNNEL_STATE_POST;
    else
      goto invalid_method;
  }

  parse_string (urlstr, sizeof (urlstr), &bptr);
  if (G_UNLIKELY (*urlstr == '\0'))
    goto invalid_url;

  parse_string (versionstr, sizeof (versionstr), &bptr);

  if (G_UNLIKELY (*bptr != '\0'))
    goto invalid_version;

  if (strcmp (versionstr, "RTSP/1.0") == 0) {
    res = gst_rtsp_message_init_request (msg, method, urlstr);
  } else if (strncmp (versionstr, "RTSP/", 5) == 0) {
    res = gst_rtsp_message_init_request (msg, method, urlstr);
    msg->type_data.request.version = GST_RTSP_VERSION_INVALID;
  } else if (strcmp (versionstr, "HTTP/1.0") == 0) {
    /* tunnel request, we need a tunnel method */
    if (tstate == TUNNEL_STATE_NONE) {
      res = GST_RTSP_EPARSE;
    } else {
      conn->tstate = tstate;
    }
  } else {
    res = GST_RTSP_EPARSE;
  }

  return res;

  /* ERRORS */
invalid_method:
  {
    GST_ERROR ("invalid method %s", methodstr);
    return GST_RTSP_EPARSE;
  }
invalid_url:
  {
    GST_ERROR ("invalid url %s", urlstr);
    return GST_RTSP_EPARSE;
  }
invalid_version:
  {
    GST_ERROR ("invalid version");
    return GST_RTSP_EPARSE;
  }
}

static GstRTSPResult
parse_key_value (guint8 * buffer, gchar * key, guint keysize, gchar ** value)
{
  gchar *bptr;

  bptr = (gchar *) buffer;

  /* read key */
  parse_key (key, keysize, &bptr);
  if (G_UNLIKELY (*bptr != ':'))
    goto no_column;

  bptr++;
  while (g_ascii_isspace (*bptr))
    bptr++;

  *value = bptr;

  return GST_RTSP_OK;

  /* ERRORS */
no_column:
  {
    return GST_RTSP_EPARSE;
  }
}

/* parsing lines means reading a Key: Value pair */
static GstRTSPResult
parse_line (GstRTSPConnection * conn, guint8 * buffer, GstRTSPMessage * msg)
{
  GstRTSPResult res;
  gchar key[32];
  gchar *value;
  GstRTSPHeaderField field;

  res = parse_key_value (buffer, key, sizeof (key), &value);
  if (G_UNLIKELY (res != GST_RTSP_OK))
    goto parse_error;

  if (conn->tstate == TUNNEL_STATE_GET || conn->tstate == TUNNEL_STATE_POST) {
    /* save the tunnel session in the connection */
    if (!strcmp (key, "x-sessioncookie")) {
      strncpy (conn->tunnelid, value, TUNNELID_LEN);
      conn->tunnelid[TUNNELID_LEN - 1] = '\0';
      conn->tunneled = TRUE;
    }
  } else {
    field = gst_rtsp_find_header_field (key);
    if (field != GST_RTSP_HDR_INVALID)
      gst_rtsp_message_add_header (msg, field, value);
  }

  return GST_RTSP_OK;

  /* ERRORS */
parse_error:
  {
    return res;
  }
}

/* returns:
 *  GST_RTSP_OK when a complete message was read.
 *  GST_RTSP_EEOF: when the socket is closed
 *  GST_RTSP_EINTR: when more data is needed.
 *  GST_RTSP_..: some other error occured.
 */
static GstRTSPResult
build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
    GstRTSPConnection * conn)
{
  GstRTSPResult res;

  while (TRUE) {
    switch (builder->state) {
      case STATE_START:
        builder->offset = 0;
        res =
            read_bytes (conn->readfd->fd, (guint8 *) builder->buffer,
            &builder->offset, 1, conn->ctxp);
        if (res != GST_RTSP_OK)
          goto done;

        /* we have 1 bytes now and we can see if this is a data message or
         * not */
        if (builder->buffer[0] == '$') {
          /* data message, prepare for the header */
          builder->state = STATE_DATA_HEADER;
        } else {
          builder->line = 0;
          builder->state = STATE_READ_LINES;
        }
        break;
      case STATE_DATA_HEADER:
      {
        res =
            read_bytes (conn->readfd->fd, (guint8 *) builder->buffer,
            &builder->offset, 4, conn->ctxp);
        if (res != GST_RTSP_OK)
          goto done;

        gst_rtsp_message_init_data (message, builder->buffer[1]);

        builder->body_len = (builder->buffer[2] << 8) | builder->buffer[3];
        builder->body_data = g_malloc (builder->body_len + 1);
        builder->body_data[builder->body_len] = '\0';
        builder->offset = 0;
        builder->state = STATE_DATA_BODY;
        break;
      }
      case STATE_DATA_BODY:
      {
        res =
            read_bytes (conn->readfd->fd, builder->body_data, &builder->offset,
            builder->body_len, conn->ctxp);
        if (res != GST_RTSP_OK)
          goto done;

        /* we have the complete body now, store in the message adjusting the
         * length to include the traling '\0' */
        gst_rtsp_message_take_body (message,
            (guint8 *) builder->body_data, builder->body_len + 1);
        builder->body_data = NULL;
        builder->body_len = 0;

        builder->state = STATE_END;
        break;
      }
      case STATE_READ_LINES:
      {
        res = read_line (conn->readfd->fd, builder->buffer, &builder->offset,
            sizeof (builder->buffer), conn->ctxp);
        if (res != GST_RTSP_OK)
          goto done;

        /* we have a regular response */
        if (builder->buffer[0] == '\r') {
          builder->buffer[0] = '\0';
        }

        if (builder->buffer[0] == '\0') {
          gchar *hdrval;

          /* empty line, end of message header */
          /* see if there is a Content-Length header */
          if (gst_rtsp_message_get_header (message,
                  GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK) {
            /* there is, prepare to read the body */
            builder->body_len = atol (hdrval);
            builder->body_data = g_malloc (builder->body_len + 1);
            builder->body_data[builder->body_len] = '\0';
            builder->offset = 0;
            builder->state = STATE_DATA_BODY;
          } else {
            builder->state = STATE_END;
          }
          break;
        }

        /* we have a line */
        if (builder->line == 0) {
          /* first line, check for response status */
          if (memcmp (builder->buffer, "RTSP", 4) == 0) {
            res = parse_response_status (builder->buffer, message);
          } else {
            res = parse_request_line (conn, builder->buffer, message);
          }
          /* the first line must parse without errors */
          if (res != GST_RTSP_OK)
            goto done;
        } else {
          /* else just parse the line, ignore errors */
          parse_line (conn, builder->buffer, message);
        }
        builder->line++;
        builder->offset = 0;
        break;
      }
      case STATE_END:
      {
        gchar *session_id;

        if (conn->tstate == TUNNEL_STATE_GET) {
          res = GST_RTSP_ETGET;
          goto done;
        } else if (conn->tstate == TUNNEL_STATE_POST) {
          res = GST_RTSP_ETPOST;
          goto done;
        }

        if (message->type == GST_RTSP_MESSAGE_DATA) {
          /* data messages don't have headers */
          res = GST_RTSP_OK;
          goto done;
        }

        /* save session id in the connection for further use */
        if (message->type == GST_RTSP_MESSAGE_RESPONSE &&
            gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION,
                &session_id, 0) == GST_RTSP_OK) {
          gint maxlen, i;

          maxlen = sizeof (conn->session_id) - 1;
          /* the sessionid can have attributes marked with ;
           * Make sure we strip them */
          for (i = 0; session_id[i] != '\0'; i++) {
            if (session_id[i] == ';') {
              maxlen = i;
              /* parse timeout */
              do {
                i++;
              } while (g_ascii_isspace (session_id[i]));
              if (g_str_has_prefix (&session_id[i], "timeout=")) {
                gint to;

                /* if we parsed something valid, configure */
                if ((to = atoi (&session_id[i + 8])) > 0)
                  conn->timeout = to;
              }
              break;
            }
          }

          /* make sure to not overflow */
          strncpy (conn->session_id, session_id, maxlen);
          conn->session_id[maxlen] = '\0';
        }
        res = GST_RTSP_OK;
        goto done;
      }
      default:
        res = GST_RTSP_ERROR;
        break;
    }
  }
done:
  return res;
}

/**
 * gst_rtsp_connection_read:
 * @conn: a #GstRTSPConnection
 * @data: the data to read
 * @size: the size of @data
 * @timeout: a timeout value or #NULL
 *
 * Attempt to read @size bytes into @data from the connected @conn, blocking up to
 * the specified @timeout. @timeout can be #NULL, in which case this function
 * might block forever.
 *
 * This function can be cancelled with gst_rtsp_connection_flush().
 *
 * Returns: #GST_RTSP_OK on success.
 */
GstRTSPResult
gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
    GTimeVal * timeout)
{
  guint offset;
  gint retval;
  GstClockTime to;
  GstRTSPResult res;

  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);

  if (G_UNLIKELY (size == 0))
    return GST_RTSP_OK;

  offset = 0;

  /* configure timeout if any */
  to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;

  gst_poll_set_controllable (conn->fdset, TRUE);
  gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE);
  gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE);

  while (TRUE) {
    res = read_bytes (conn->readfd->fd, data, &offset, size, conn->ctxp);
    if (G_UNLIKELY (res == GST_RTSP_EEOF))
      goto eof;
    if (G_LIKELY (res == GST_RTSP_OK))
      break;
    if (G_UNLIKELY (res != GST_RTSP_EINTR))
      goto read_error;

    do {
      retval = gst_poll_wait (conn->fdset, to);
    } while (retval == -1 && (errno == EINTR || errno == EAGAIN));

    /* check for timeout */
    if (G_UNLIKELY (retval == 0))
      goto select_timeout;

    if (G_UNLIKELY (retval == -1)) {
      if (errno == EBUSY)
        goto stopped;
      else
        goto select_error;
    }
    gst_poll_set_controllable (conn->fdset, FALSE);
  }
  return GST_RTSP_OK;

  /* ERRORS */
select_error:
  {
    return GST_RTSP_ESYS;
  }
select_timeout:
  {
    return GST_RTSP_ETIMEOUT;
  }
stopped:
  {
    return GST_RTSP_EINTR;
  }
eof:
  {
    return GST_RTSP_EEOF;
  }
read_error:
  {
    return res;
  }
}

static GString *
gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code)
{
  GString *str;
  gchar date_string[100];
  const gchar *status;

  gen_date_string (date_string, sizeof (date_string));

  status = gst_rtsp_status_as_text (code);
  if (status == NULL) {
    code = GST_RTSP_STS_INTERNAL_SERVER_ERROR;
    status = "Internal Server Error";
  }

  str = g_string_new ("");

  /* */
  g_string_append_printf (str, "HTTP/1.0 %d %s\r\n", code, status);
  g_string_append_printf (str,
      "Server: GStreamer RTSP Server\r\n"
      "Date: %s\r\n"
      "Connection: close\r\n"
      "Cache-Control: no-store\r\n" "Pragma: no-cache\r\n", date_string);
  if (code == GST_RTSP_STS_OK) {
    if (conn->ip)
      g_string_append_printf (str, "x-server-ip-address: %s\r\n", conn->ip);
    g_string_append_printf (str,
        "Content-Type: application/x-rtsp-tunnelled\r\n");
  }
  g_string_append_printf (str, "\r\n");
  return str;
}

/**
 * gst_rtsp_connection_receive:
 * @conn: a #GstRTSPConnection
 * @message: the message to read
 * @timeout: a timeout value or #NULL
 *
 * Attempt to read into @message from the connected @conn, blocking up to
 * the specified @timeout. @timeout can be #NULL, in which case this function
 * might block forever.
 * 
 * This function can be cancelled with gst_rtsp_connection_flush().
 *
 * Returns: #GST_RTSP_OK on success.
 */
GstRTSPResult
gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
    GTimeVal * timeout)
{
  GstRTSPResult res;
  GstRTSPBuilder builder;
  gint retval;
  GstClockTime to;

  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);

  /* configure timeout if any */
  to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;

  gst_poll_set_controllable (conn->fdset, TRUE);
  gst_poll_fd_ctl_write (conn->fdset, conn->writefd, FALSE);
  gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE);

  memset (&builder, 0, sizeof (GstRTSPBuilder));
  while (TRUE) {
    res = build_next (&builder, message, conn);
    if (G_UNLIKELY (res == GST_RTSP_EEOF))
      goto eof;
    if (G_LIKELY (res == GST_RTSP_OK))
      break;
    if (res == GST_RTSP_ETGET) {
      GString *str;

      /* tunnel GET request, we can reply now */
      str = gen_tunnel_reply (conn, GST_RTSP_STS_OK);
      res =
          gst_rtsp_connection_write (conn, (guint8 *) str->str, str->len,
          timeout);
      g_string_free (str, TRUE);
    } else if (res == GST_RTSP_ETPOST) {
      /* tunnel POST request, return the value, the caller now has to link the
       * two connections. */
      break;
    } else if (G_UNLIKELY (res != GST_RTSP_EINTR))
      goto read_error;

    do {
      retval = gst_poll_wait (conn->fdset, to);
    } while (retval == -1 && (errno == EINTR || errno == EAGAIN));

    /* check for timeout */
    if (G_UNLIKELY (retval == 0))
      goto select_timeout;

    if (G_UNLIKELY (retval == -1)) {
      if (errno == EBUSY)
        goto stopped;
      else
        goto select_error;
    }
    gst_poll_set_controllable (conn->fdset, FALSE);
  }

  /* we have a message here */
  build_reset (&builder);

  return GST_RTSP_OK;

  /* ERRORS */
select_error:
  {
    res = GST_RTSP_ESYS;
    goto cleanup;
  }
select_timeout:
  {
    res = GST_RTSP_ETIMEOUT;
    goto cleanup;
  }
stopped:
  {
    res = GST_RTSP_EINTR;
    goto cleanup;
  }
eof:
  {
    res = GST_RTSP_EEOF;
    goto cleanup;
  }
read_error:
cleanup:
  {
    build_reset (&builder);
    gst_rtsp_message_unset (message);
    return res;
  }
}

/**
 * gst_rtsp_connection_close:
 * @conn: a #GstRTSPConnection
 *
 * Close the connected @conn. After this call, the connection is in the same
 * state as when it was first created.
 * 
 * Returns: #GST_RTSP_OK on success.
 */
GstRTSPResult
gst_rtsp_connection_close (GstRTSPConnection * conn)
{
  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);

  g_free (conn->ip);
  conn->ip = NULL;

  REMOVE_POLLFD (conn->fdset, &conn->fd0);
  REMOVE_POLLFD (conn->fdset, &conn->fd1);
  conn->writefd = NULL;
  conn->readfd = NULL;
  conn->tunneled = FALSE;
  conn->tstate = TUNNEL_STATE_NONE;
  conn->ctxp = NULL;
  g_free (conn->username);
  conn->username = NULL;
  g_free (conn->passwd);
  conn->passwd = NULL;
  gst_rtsp_connection_clear_auth_params (conn);
  conn->timeout = 60;
  conn->cseq = 0;
  conn->session_id[0] = '\0';

  return GST_RTSP_OK;
}

/**
 * gst_rtsp_connection_free:
 * @conn: a #GstRTSPConnection
 *
 * Close and free @conn.
 * 
 * Returns: #GST_RTSP_OK on success.
 */
GstRTSPResult
gst_rtsp_connection_free (GstRTSPConnection * conn)
{
  GstRTSPResult res;

  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);

  res = gst_rtsp_connection_close (conn);
  gst_poll_free (conn->fdset);
  g_timer_destroy (conn->timer);
  gst_rtsp_url_free (conn->url);
  g_free (conn->proxy_host);
  g_free (conn);
#ifdef G_OS_WIN32
  WSACleanup ();
#endif

  return res;
}

/**
 * gst_rtsp_connection_poll:
 * @conn: a #GstRTSPConnection
 * @events: a bitmask of #GstRTSPEvent flags to check
 * @revents: location for result flags 
 * @timeout: a timeout
 *
 * Wait up to the specified @timeout for the connection to become available for
 * at least one of the operations specified in @events. When the function returns
 * with #GST_RTSP_OK, @revents will contain a bitmask of available operations on
 * @conn.
 *
 * @timeout can be #NULL, in which case this function might block forever.
 *
 * This function can be cancelled with gst_rtsp_connection_flush().
 * 
 * Returns: #GST_RTSP_OK on success.
 *
 * Since: 0.10.15
 */
GstRTSPResult
gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events,
    GstRTSPEvent * revents, GTimeVal * timeout)
{
  GstClockTime to;
  gint retval;

  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (events != 0, GST_RTSP_EINVAL);
  g_return_val_if_fail (revents != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);

  gst_poll_set_controllable (conn->fdset, TRUE);

  /* add fd to writer set when asked to */
  gst_poll_fd_ctl_write (conn->fdset, conn->writefd,
      events & GST_RTSP_EV_WRITE);

  /* add fd to reader set when asked to */
  gst_poll_fd_ctl_read (conn->fdset, conn->readfd, events & GST_RTSP_EV_READ);

  /* configure timeout if any */
  to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;

  do {
    retval = gst_poll_wait (conn->fdset, to);
  } while (retval == -1 && (errno == EINTR || errno == EAGAIN));

  if (G_UNLIKELY (retval == 0))
    goto select_timeout;

  if (G_UNLIKELY (retval == -1)) {
    if (errno == EBUSY)
      goto stopped;
    else
      goto select_error;
  }

  *revents = 0;
  if (events & GST_RTSP_EV_READ) {
    if (gst_poll_fd_can_read (conn->fdset, conn->readfd))
      *revents |= GST_RTSP_EV_READ;
  }
  if (events & GST_RTSP_EV_WRITE) {
    if (gst_poll_fd_can_write (conn->fdset, conn->writefd))
      *revents |= GST_RTSP_EV_WRITE;
  }
  return GST_RTSP_OK;

  /* ERRORS */
select_timeout:
  {
    return GST_RTSP_ETIMEOUT;
  }
select_error:
  {
    return GST_RTSP_ESYS;
  }
stopped:
  {
    return GST_RTSP_EINTR;
  }
}

/**
 * gst_rtsp_connection_next_timeout:
 * @conn: a #GstRTSPConnection
 * @timeout: a timeout
 *
 * Calculate the next timeout for @conn, storing the result in @timeout.
 * 
 * Returns: #GST_RTSP_OK.
 */
GstRTSPResult
gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout)
{
  gdouble elapsed;
  glong sec;
  gulong usec;

  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL);

  elapsed = g_timer_elapsed (conn->timer, &usec);
  if (elapsed >= conn->timeout) {
    sec = 0;
    usec = 0;
  } else {
    sec = conn->timeout - elapsed;
  }

  timeout->tv_sec = sec;
  timeout->tv_usec = usec;

  return GST_RTSP_OK;
}

/**
 * gst_rtsp_connection_reset_timeout:
 * @conn: a #GstRTSPConnection
 *
 * Reset the timeout of @conn.
 * 
 * Returns: #GST_RTSP_OK.
 */
GstRTSPResult
gst_rtsp_connection_reset_timeout (GstRTSPConnection * conn)
{
  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);

  g_timer_start (conn->timer);

  return GST_RTSP_OK;
}

/**
 * gst_rtsp_connection_flush:
 * @conn: a #GstRTSPConnection
 * @flush: start or stop the flush
 *
 * Start or stop the flushing action on @conn. When flushing, all current
 * and future actions on @conn will return #GST_RTSP_EINTR until the connection
 * is set to non-flushing mode again.
 * 
 * Returns: #GST_RTSP_OK.
 */
GstRTSPResult
gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush)
{
  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);

  gst_poll_set_flushing (conn->fdset, flush);

  return GST_RTSP_OK;
}

/**
 * gst_rtsp_connection_set_proxy:
 * @conn: a #GstRTSPConnection
 * @host: the proxy host
 * @port: the proxy port
 *
 * Set the proxy host and port.
 * 
 * Returns: #GST_RTSP_OK.
 *
 * Since: 0.10.23
 */
GstRTSPResult
gst_rtsp_connection_set_proxy (GstRTSPConnection * conn,
    const gchar * host, guint port)
{
  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);

  g_free (conn->proxy_host);
  conn->proxy_host = g_strdup (host);
  conn->proxy_port = port;

  return GST_RTSP_OK;
}

/**
 * gst_rtsp_connection_set_auth:
 * @conn: a #GstRTSPConnection
 * @method: authentication method
 * @user: the user
 * @pass: the password
 *
 * Configure @conn for authentication mode @method with @user and @pass as the
 * user and password respectively.
 * 
 * Returns: #GST_RTSP_OK.
 */
GstRTSPResult
gst_rtsp_connection_set_auth (GstRTSPConnection * conn,
    GstRTSPAuthMethod method, const gchar * user, const gchar * pass)
{
  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);

  if (method == GST_RTSP_AUTH_DIGEST && ((user == NULL || pass == NULL)
          || g_strrstr (user, ":") != NULL))
    return GST_RTSP_EINVAL;

  /* Make sure the username and passwd are being set for authentication */
  if (method == GST_RTSP_AUTH_NONE && (user == NULL || pass == NULL))
    return GST_RTSP_EINVAL;

  /* ":" chars are not allowed in usernames for basic auth */
  if (method == GST_RTSP_AUTH_BASIC && g_strrstr (user, ":") != NULL)
    return GST_RTSP_EINVAL;

  g_free (conn->username);
  g_free (conn->passwd);

  conn->auth_method = method;
  conn->username = g_strdup (user);
  conn->passwd = g_strdup (pass);

  return GST_RTSP_OK;
}

/**
 * str_case_hash:
 * @key: ASCII string to hash
 *
 * Hashes @key in a case-insensitive manner.
 *
 * Returns: the hash code.
 **/
static guint
str_case_hash (gconstpointer key)
{
  const char *p = key;
  guint h = g_ascii_toupper (*p);

  if (h)
    for (p += 1; *p != '\0'; p++)
      h = (h << 5) - h + g_ascii_toupper (*p);

  return h;
}

/**
 * str_case_equal:
 * @v1: an ASCII string
 * @v2: another ASCII string
 *
 * Compares @v1 and @v2 in a case-insensitive manner
 *
 * Returns: %TRUE if they are equal (modulo case)
 **/
static gboolean
str_case_equal (gconstpointer v1, gconstpointer v2)
{
  const char *string1 = v1;
  const char *string2 = v2;

  return g_ascii_strcasecmp (string1, string2) == 0;
}

/**
 * gst_rtsp_connection_set_auth_param:
 * @conn: a #GstRTSPConnection
 * @param: authentication directive
 * @value: value
 *
 * Setup @conn with authentication directives. This is not necesary for
 * methods #GST_RTSP_AUTH_NONE and #GST_RTSP_AUTH_BASIC. For
 * #GST_RTSP_AUTH_DIGEST, directives should be taken from the digest challenge
 * in the WWW-Authenticate response header and can include realm, domain,
 * nonce, opaque, stale, algorithm, qop as per RFC2617.
 * 
 * Since: 0.10.20
 */
void
gst_rtsp_connection_set_auth_param (GstRTSPConnection * conn,
    const gchar * param, const gchar * value)
{
  g_return_if_fail (conn != NULL);
  g_return_if_fail (param != NULL);

  if (conn->auth_params == NULL) {
    conn->auth_params =
        g_hash_table_new_full (str_case_hash, str_case_equal, g_free, g_free);
  }
  g_hash_table_insert (conn->auth_params, g_strdup (param), g_strdup (value));
}

/**
 * gst_rtsp_connection_clear_auth_params:
 * @conn: a #GstRTSPConnection
 *
 * Clear the list of authentication directives stored in @conn.
 *
 * Since: 0.10.20
 */
void
gst_rtsp_connection_clear_auth_params (GstRTSPConnection * conn)
{
  g_return_if_fail (conn != NULL);

  if (conn->auth_params != NULL) {
    g_hash_table_destroy (conn->auth_params);
    conn->auth_params = NULL;
  }
}

static GstRTSPResult
set_qos_dscp (gint fd, guint qos_dscp)
{
  union gst_sockaddr sa;
  socklen_t slen = sizeof (sa);
  gint af;
  gint tos;

  if (fd == -1)
    return GST_RTSP_OK;

  if (getsockname (fd, &sa.sa, &slen) < 0)
    goto no_getsockname;

  af = sa.sa.sa_family;

  /* if this is an IPv4-mapped address then do IPv4 QoS */
  if (af == AF_INET6) {
    if (IN6_IS_ADDR_V4MAPPED (&sa.sa_in6.sin6_addr))
      af = AF_INET;
  }

  /* extract and shift 6 bits of the DSCP */
  tos = (qos_dscp & 0x3f) << 2;

  switch (af) {
    case AF_INET:
      if (SETSOCKOPT (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0)
        goto no_setsockopt;
      break;
    case AF_INET6:
#ifdef IPV6_TCLASS
      if (SETSOCKOPT (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0)
        goto no_setsockopt;
      break;
#endif
    default:
      goto wrong_family;
  }

  return GST_RTSP_OK;

  /* ERRORS */
no_getsockname:
no_setsockopt:
  {
    return GST_RTSP_ESYS;
  }

wrong_family:
  {
    return GST_RTSP_ERROR;
  }
}

/**
 * gst_rtsp_connection_set_qos_dscp:
 * @conn: a #GstRTSPConnection
 * @qos_dscp: DSCP value
 *
 * Configure @conn to use the specified DSCP value.
 *
 * Returns: #GST_RTSP_OK on success.
 *
 * Since: 0.10.20
 */
GstRTSPResult
gst_rtsp_connection_set_qos_dscp (GstRTSPConnection * conn, guint qos_dscp)
{
  GstRTSPResult res;

  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (conn->readfd != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (conn->writefd != NULL, GST_RTSP_EINVAL);

  res = set_qos_dscp (conn->fd0.fd, qos_dscp);
  if (res == GST_RTSP_OK)
    res = set_qos_dscp (conn->fd1.fd, qos_dscp);

  return res;
}


/**
 * gst_rtsp_connection_get_url:
 * @conn: a #GstRTSPConnection
 *
 * Retrieve the URL of the other end of @conn.
 *
 * Returns: The URL. This value remains valid until the
 * connection is freed.
 *
 * Since: 0.10.23
 */
GstRTSPUrl *
gst_rtsp_connection_get_url (const GstRTSPConnection * conn)
{
  g_return_val_if_fail (conn != NULL, NULL);

  return conn->url;
}

/**
 * gst_rtsp_connection_get_ip:
 * @conn: a #GstRTSPConnection
 *
 * Retrieve the IP address of the other end of @conn.
 *
 * Returns: The IP address as a string. this value remains valid until the
 * connection is closed.
 *
 * Since: 0.10.20
 */
const gchar *
gst_rtsp_connection_get_ip (const GstRTSPConnection * conn)
{
  g_return_val_if_fail (conn != NULL, NULL);

  return conn->ip;
}

/**
 * gst_rtsp_connection_set_ip:
 * @conn: a #GstRTSPConnection
 * @ip: an ip address
 *
 * Set the IP address of the server.
 *
 * Since: 0.10.23
 */
void
gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip)
{
  g_return_if_fail (conn != NULL);

  g_free (conn->ip);
  conn->ip = g_strdup (ip);
}

/**
 * gst_rtsp_connection_get_readfd:
 * @conn: a #GstRTSPConnection
 *
 * Get the file descriptor for reading.
 *
 * Returns: the file descriptor used for reading or -1 on error. The file
 * descriptor remains valid until the connection is closed.
 *
 * Since: 0.10.23
 */
gint
gst_rtsp_connection_get_readfd (const GstRTSPConnection * conn)
{
  g_return_val_if_fail (conn != NULL, -1);
  g_return_val_if_fail (conn->readfd != NULL, -1);

  return conn->readfd->fd;
}

/**
 * gst_rtsp_connection_get_writefd:
 * @conn: a #GstRTSPConnection
 *
 * Get the file descriptor for writing.
 *
 * Returns: the file descriptor used for writing or -1 on error. The file
 * descriptor remains valid until the connection is closed.
 *
 * Since: 0.10.23
 */
gint
gst_rtsp_connection_get_writefd (const GstRTSPConnection * conn)
{
  g_return_val_if_fail (conn != NULL, -1);
  g_return_val_if_fail (conn->writefd != NULL, -1);

  return conn->writefd->fd;
}


/**
 * gst_rtsp_connection_set_tunneled:
 * @conn: a #GstRTSPConnection
 * @tunneled: the new state
 *
 * Set the HTTP tunneling state of the connection. This must be configured before
 * the @conn is connected.
 *
 * Since: 0.10.23
 */
void
gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled)
{
  g_return_if_fail (conn != NULL);
  g_return_if_fail (conn->readfd == NULL);
  g_return_if_fail (conn->writefd == NULL);

  conn->tunneled = tunneled;
}

/**
 * gst_rtsp_connection_is_tunneled:
 * @conn: a #GstRTSPConnection
 *
 * Get the tunneling state of the connection. 
 *
 * Returns: if @conn is using HTTP tunneling.
 *
 * Since: 0.10.23
 */
gboolean
gst_rtsp_connection_is_tunneled (const GstRTSPConnection * conn)
{
  g_return_val_if_fail (conn != NULL, FALSE);

  return conn->tunneled;
}

/**
 * gst_rtsp_connection_get_tunnelid:
 * @conn: a #GstRTSPConnection
 *
 * Get the tunnel session id the connection. 
 *
 * Returns: returns a non-empty string if @conn is being tunneled over HTTP.
 *
 * Since: 0.10.23
 */
const gchar *
gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn)
{
  g_return_val_if_fail (conn != NULL, NULL);

  if (!conn->tunneled)
    return NULL;

  return conn->tunnelid;
}

/**
 * gst_rtsp_connection_do_tunnel:
 * @conn: a #GstRTSPConnection
 * @conn2: a #GstRTSPConnection
 *
 * If @conn received the first tunnel connection and @conn2 received
 * the second tunnel connection, link the two connections together so that
 * @conn manages the tunneled connection.
 *
 * After this call, @conn2 cannot be used anymore and must be freed with
 * gst_rtsp_connection_free().
 *
 * Returns: return GST_RTSP_OK on success.
 *
 * Since: 0.10.23
 */
GstRTSPResult
gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
    GstRTSPConnection * conn2)
{
  g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (conn2 != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (conn->tstate == TUNNEL_STATE_GET, GST_RTSP_EINVAL);
  g_return_val_if_fail (conn2->tstate == TUNNEL_STATE_POST, GST_RTSP_EINVAL);
  g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid, TUNNELID_LEN),
      GST_RTSP_EINVAL);

  /* both connections have fd0 as the read/write socket. start by taking the
   * socket from conn2 and set it as the socket in conn */
  conn->fd1 = conn2->fd0;

  /* clean up some of the state of conn2 */
  gst_poll_remove_fd (conn2->fdset, &conn2->fd0);
  conn2->fd0.fd = -1;
  conn2->readfd = conn2->writefd = NULL;

  /* We make fd0 the write socket and fd1 the read socket. */
  conn->writefd = &conn->fd0;
  conn->readfd = &conn->fd1;

  conn->tstate = TUNNEL_STATE_COMPLETE;

  /* we need base64 decoding for the readfd */
  conn->ctx.state = 0;
  conn->ctx.save = 0;
  conn->ctx.cout = 0;
  conn->ctx.coutl = 0;
  conn->ctxp = &conn->ctx;

  return GST_RTSP_OK;
}

#define READ_COND   (G_IO_IN | G_IO_HUP | G_IO_ERR)
#define WRITE_COND  (G_IO_OUT | G_IO_ERR)

typedef struct
{
  guint8 *data;
  guint size;
  guint id;
} GstRTSPRec;

/* async functions */
struct _GstRTSPWatch
{
  GSource source;

  GstRTSPConnection *conn;

  GstRTSPBuilder builder;
  GstRTSPMessage message;

  GPollFD readfd;
  GPollFD writefd;
  gboolean write_added;

  /* queued message for transmission */
  guint id;
  GAsyncQueue *messages;
  guint8 *write_data;
  guint write_off;
  guint write_size;
  guint write_id;

  GstRTSPWatchFuncs funcs;

  gpointer user_data;
  GDestroyNotify notify;
};

static gboolean
gst_rtsp_source_prepare (GSource * source, gint * timeout)
{
  GstRTSPWatch *watch = (GstRTSPWatch *) source;

  *timeout = (watch->conn->timeout * 1000);

  return FALSE;
}

static gboolean
gst_rtsp_source_check (GSource * source)
{
  GstRTSPWatch *watch = (GstRTSPWatch *) source;

  if (watch->readfd.revents & READ_COND)
    return TRUE;

  if (watch->writefd.revents & WRITE_COND)
    return TRUE;

  return FALSE;
}

static gboolean
gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
    gpointer user_data G_GNUC_UNUSED)
{
  GstRTSPWatch *watch = (GstRTSPWatch *) source;
  GstRTSPResult res;

  /* first read as much as we can */
  if (watch->readfd.revents & READ_COND) {
    do {
      res = build_next (&watch->builder, &watch->message, watch->conn);
      if (res == GST_RTSP_EINTR)
        break;
      if (G_UNLIKELY (res == GST_RTSP_EEOF))
        goto eof;
      if (res == GST_RTSP_ETGET) {
        GString *str;
        GstRTSPStatusCode code;
        guint size;

        if (watch->funcs.tunnel_start)
          code = watch->funcs.tunnel_start (watch, watch->user_data);
        else
          code = GST_RTSP_STS_OK;

        /* queue the response string */
        str = gen_tunnel_reply (watch->conn, code);
        size = str->len;
        gst_rtsp_watch_queue_data (watch, (guint8 *) g_string_free (str, FALSE),
            size);
      } else if (res == GST_RTSP_ETPOST) {
        /* in the callback the connection should be tunneled with the
         * GET connection */
        if (watch->funcs.tunnel_complete)
          watch->funcs.tunnel_complete (watch, watch->user_data);
      } else if (G_UNLIKELY (res != GST_RTSP_OK))
        goto error;

      if (G_LIKELY (res == GST_RTSP_OK)) {
        if (watch->funcs.message_received)
          watch->funcs.message_received (watch, &watch->message,
              watch->user_data);

        gst_rtsp_message_unset (&watch->message);
      }
      build_reset (&watch->builder);
    } while (FALSE);
  }

  if (watch->writefd.revents & WRITE_COND) {
    do {
      if (watch->write_data == NULL) {
        GstRTSPRec *rec;

        /* get a new message from the queue */
        rec = g_async_queue_try_pop (watch->messages);
        if (rec == NULL)
          goto done;

        watch->write_off = 0;
        watch->write_data = rec->data;
        watch->write_size = rec->size;
        watch->write_id = rec->id;

        g_slice_free (GstRTSPRec, rec);
      }

      res = write_bytes (watch->writefd.fd, watch->write_data,
          &watch->write_off, watch->write_size);
      if (res == GST_RTSP_EINTR)
        break;
      if (G_UNLIKELY (res != GST_RTSP_OK))
        goto error;

      if (watch->funcs.message_sent)
        watch->funcs.message_sent (watch, watch->write_id, watch->user_data);

    done:
      if (g_async_queue_length (watch->messages) == 0 && watch->write_added) {
        g_source_remove_poll ((GSource *) watch, &watch->writefd);
        watch->write_added = FALSE;
        watch->writefd.revents = 0;
      }
      g_free (watch->write_data);
      watch->write_data = NULL;
    } while (FALSE);
  }

  return TRUE;

  /* ERRORS */
eof:
  {
    if (watch->funcs.closed)
      watch->funcs.closed (watch, watch->user_data);
    return FALSE;
  }
error:
  {
    if (watch->funcs.error)
      watch->funcs.error (watch, res, watch->user_data);
    return FALSE;
  }
}

static void
gst_rtsp_rec_free (gpointer data)
{
  GstRTSPRec *rec = data;

  g_free (rec->data);
  g_slice_free (GstRTSPRec, rec);
}

static void
gst_rtsp_source_finalize (GSource * source)
{
  GstRTSPWatch *watch = (GstRTSPWatch *) source;

  build_reset (&watch->builder);
  gst_rtsp_message_unset (&watch->message);

  g_async_queue_unref (watch->messages);
  watch->messages = NULL;

  g_free (watch->write_data);

  if (watch->notify)
    watch->notify (watch->user_data);
}

static GSourceFuncs gst_rtsp_source_funcs = {
  gst_rtsp_source_prepare,
  gst_rtsp_source_check,
  gst_rtsp_source_dispatch,
  gst_rtsp_source_finalize,
  NULL,
  NULL
};

/**
 * gst_rtsp_watch_new:
 * @conn: a #GstRTSPConnection
 * @funcs: watch functions
 * @user_data: user data to pass to @funcs
 * @notify: notify when @user_data is not referenced anymore
 *
 * Create a watch object for @conn. The functions provided in @funcs will be
 * called with @user_data when activity happened on the watch.
 *
 * The new watch is usually created so that it can be attached to a
 * maincontext with gst_rtsp_watch_attach(). 
 *
 * @conn must exist for the entire lifetime of the watch.
 *
 * Returns: a #GstRTSPWatch that can be used for asynchronous RTSP
 * communication. Free with gst_rtsp_watch_unref () after usage.
 *
 * Since: 0.10.23
 */
GstRTSPWatch *
gst_rtsp_watch_new (GstRTSPConnection * conn,
    GstRTSPWatchFuncs * funcs, gpointer user_data, GDestroyNotify notify)
{
  GstRTSPWatch *result;

  g_return_val_if_fail (conn != NULL, NULL);
  g_return_val_if_fail (funcs != NULL, NULL);
  g_return_val_if_fail (conn->readfd != NULL, NULL);
  g_return_val_if_fail (conn->writefd != NULL, NULL);

  result = (GstRTSPWatch *) g_source_new (&gst_rtsp_source_funcs,
      sizeof (GstRTSPWatch));

  result->conn = conn;
  result->builder.state = STATE_START;

  result->messages = g_async_queue_new_full (gst_rtsp_rec_free);

  result->readfd.fd = -1;
  result->writefd.fd = -1;

  gst_rtsp_watch_reset (result);

  result->funcs = *funcs;
  result->user_data = user_data;
  result->notify = notify;

  /* only add the read fd, the write fd is only added when we have data
   * to send. */
  g_source_add_poll ((GSource *) result, &result->readfd);

  return result;
}

/**
 * gst_rtsp_watch_reset:
 * @watch: a #GstRTSPWatch
 *
 * Reset @watch, this is usually called after gst_rtsp_connection_do_tunnel()
 * when the file descriptors of the connection might have changed.
 *
 * Since: 0.10.23
 */
void
gst_rtsp_watch_reset (GstRTSPWatch * watch)
{
  if (watch->readfd.fd != -1)
    g_source_remove_poll ((GSource *) watch, &watch->readfd);
  if (watch->writefd.fd != -1)
    g_source_remove_poll ((GSource *) watch, &watch->writefd);

  watch->readfd.fd = watch->conn->readfd->fd;
  watch->readfd.events = READ_COND;
  watch->readfd.revents = 0;

  watch->writefd.fd = watch->conn->writefd->fd;
  watch->writefd.events = WRITE_COND;
  watch->writefd.revents = 0;
  watch->write_added = FALSE;

  g_source_add_poll ((GSource *) watch, &watch->readfd);
}

/**
 * gst_rtsp_watch_attach:
 * @watch: a #GstRTSPWatch
 * @context: a GMainContext (if NULL, the default context will be used)
 *
 * Adds a #GstRTSPWatch to a context so that it will be executed within that context.
 *
 * Returns: the ID (greater than 0) for the watch within the GMainContext. 
 *
 * Since: 0.10.23
 */
guint
gst_rtsp_watch_attach (GstRTSPWatch * watch, GMainContext * context)
{
  g_return_val_if_fail (watch != NULL, 0);

  return g_source_attach ((GSource *) watch, context);
}

/**
 * gst_rtsp_watch_unref:
 * @watch: a #GstRTSPWatch
 *
 * Decreases the reference count of @watch by one. If the resulting reference
 * count is zero the watch and associated memory will be destroyed.
 *
 * Since: 0.10.23
 */
void
gst_rtsp_watch_unref (GstRTSPWatch * watch)
{
  g_return_if_fail (watch != NULL);

  g_source_unref ((GSource *) watch);
}

/**
 * gst_rtsp_watch_queue_data:
 * @watch: a #GstRTSPWatch
 * @data: the data to queue
 * @size: the size of @data
 *
 * Queue @data for transmission in @watch. It will be transmitted when the
 * connection of the @watch becomes writable.
 *
 * This function will take ownership of @data and g_free() it after use.
 *
 * The return value of this function will be used as the id argument in the
 * message_sent callback.
 *
 * Returns: an id.
 *
 * Since: 0.10.24
 */
guint
gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
    guint size)
{
  GstRTSPRec *rec;

  g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (size != 0, GST_RTSP_EINVAL);

  /* make a record with the data and id */
  rec = g_slice_new (GstRTSPRec);
  rec->data = (guint8 *) data;
  rec->size = size;
  do {
    /* make sure rec->id is never 0 */
    rec->id = ++watch->id;
  } while (G_UNLIKELY (rec->id == 0));

  /* add the record to a queue. FIXME we would like to have an upper limit here */
  g_async_queue_push (watch->messages, rec);

  /* FIXME: does the following need to be made thread-safe? (this might be
   * called from a streaming thread, like appsink's render function) */
  /* make sure the main context will now also check for writability on the
   * socket */
  if (!watch->write_added) {
    g_source_add_poll ((GSource *) watch, &watch->writefd);
    watch->write_added = TRUE;
  }

  return rec->id;
}

/**
 * gst_rtsp_watch_queue_message:
 * @watch: a #GstRTSPWatch
 * @message: a #GstRTSPMessage
 *
 * Queue a @message for transmission in @watch. The contents of this
 * message will be serialized and transmitted when the connection of the
 * @watch becomes writable.
 *
 * The return value of this function will be used as the id argument in the
 * message_sent callback.
 *
 * Returns: an id.
 *
 * Since: 0.10.23
 */
guint
gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message)
{
  GString *str;
  guint size;

  g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
  g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);

  /* make a record with the message as a string and id */
  str = message_to_string (watch->conn, message);
  size = str->len;
  return gst_rtsp_watch_queue_data (watch,
      (guint8 *) g_string_free (str, FALSE), size);
}