summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorColin Guthrie <pulse@colin.guthr.ie>2008-05-07 00:35:10 +0000
committerColin Guthrie <pulse@colin.guthr.ie>2008-10-08 20:32:07 +0100
commit6510d97315b9bdf7b1afc204c3dca0a2b0a3a528 (patch)
tree5ae6076d53b63e6474b57dbfbf26f147b1d39f47
parent22e299ad3e16d1a2636653a7be9d625ecdc23802 (diff)
Use a more stateful response parser.
This makes things fully asyncronous. Some of the continuation headerlist stuff could be moved to headerlist for neatness, but this is OK for now. git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/coling@2373 fefdeb5f-60dc-0310-8127-8f9354f1896f
-rw-r--r--src/modules/rtp/rtsp.c320
1 files changed, 138 insertions, 182 deletions
diff --git a/src/modules/rtp/rtsp.c b/src/modules/rtp/rtsp.c
index 4f2411ab..44cd80b9 100644
--- a/src/modules/rtp/rtsp.c
+++ b/src/modules/rtp/rtsp.c
@@ -46,84 +46,35 @@
#include <pulsecore/macro.h>
#include <pulsecore/strbuf.h>
#include <pulsecore/poll.h>
+#include <pulsecore/ioline.h>
#include "rtsp.h"
struct pa_rtsp_context {
pa_socket_client *sc;
pa_iochannel *io;
- pa_rtsp_cb_t callback;
- void* userdata;
- const char* useragent;
- pa_headerlist* headers;
- char* localip;
- char* url;
- uint32_t port;
- uint32_t cseq;
- char* session;
- char* transport;
- pa_rtsp_state state;
-};
+ pa_ioline *ioline;
-/*
- * read one line from the file descriptor
- * timeout: msec unit, -1 for infinite
- * if CR comes then following LF is expected
- * returned string in line is always null terminated, maxlen-1 is maximum string length
- */
-static int pa_read_line(pa_iochannel* io, char *line, int maxlen, int timeout)
-{
- int i, rval;
- int count;
- int fd;
- char ch;
- struct pollfd pfds;
-
- pa_assert(io);
- fd = pa_iochannel_get_recv_fd(io);
-
- count = 0;
- *line = 0;
- pfds.events = POLLIN;
- pfds.fd = fd;
-
- for (i=0; i<maxlen; ++i) {
- if (!poll(&pfds, 1, timeout))
- return 0;
-
- rval = read(fd, &ch, 1);
-
- if (-1 == rval) {
- if (EAGAIN == errno)
- return 0;
- /*ERRMSG("%s:read error: %s\n", __func__, strerror(errno));*/
- return -1;
- }
-
- if (0 == rval) {
- /*INFMSG("%s:disconnected on the other end\n", __func__);*/
- return -1;
- }
-
- if ('\n' == ch) {
- *line = 0;
- return count;
- }
-
- if ('\r' == ch)
- continue;
+ pa_rtsp_cb_t callback;
- *line++ = ch;
- count++;
+ void *userdata;
+ const char *useragent;
- if (count >= maxlen-1)
- break;
- }
+ pa_rtsp_state state;
+ uint8_t waiting;
- *line = 0;
- return count;
-}
+ pa_headerlist* headers;
+ char *last_header;
+ pa_strbuf *header_buffer;
+ pa_headerlist* response_headers;
+ char *localip;
+ char *url;
+ uint32_t port;
+ uint32_t cseq;
+ char *session;
+ char *transport;
+};
static int pa_rtsp_exec(pa_rtsp_context* c, const char* cmd,
const char* content_type, const char* content,
@@ -172,8 +123,8 @@ static int pa_rtsp_exec(pa_rtsp_context* c, const char* cmd,
/* Our packet is created... now we can send it :) */
hdrs = pa_strbuf_tostring_free(buf);
- pa_log_debug("Submitting request:");
- pa_log_debug(hdrs);
+ /*pa_log_debug("Submitting request:");
+ pa_log_debug(hdrs);*/
l = pa_iochannel_write(c->io, hdrs, strlen(hdrs));
pa_xfree(hdrs);
@@ -205,125 +156,39 @@ void pa_rtsp_context_free(pa_rtsp_context* c) {
pa_xfree(c->localip);
pa_xfree(c->session);
pa_xfree(c->transport);
+ pa_xfree(c->last_header);
+ if (c->header_buffer)
+ pa_strbuf_free(c->header_buffer);
+ if (c->response_headers)
+ pa_headerlist_free(c->response_headers);
pa_headerlist_free(c->headers);
}
pa_xfree(c);
}
-static void io_callback(PA_GCC_UNUSED pa_iochannel *io, void *userdata) {
- pa_strbuf* buf;
- pa_headerlist* response_headers = NULL;
- char response[1024];
- int timeout;
+static void headers_read(pa_rtsp_context *c) {
char* token;
- char* header;
- char* delimpos;
- char delimiters[] = " ";
- pa_rtsp_context *c = userdata;
- pa_assert(c);
- pa_assert(c->io == io);
-
- if (!pa_iochannel_is_readable(c->io)) {
- if (STATE_SETUP == c->state || STATE_ANNOUNCE == c->state) return;
- goto do_callback;
- }
-
- /* TODO: convert this to a pa_ioline based reader */
- if (STATE_SETUP == c->state || STATE_ANNOUNCE == c->state) {
- response_headers = pa_headerlist_new();
- }
- timeout = 5000;
- /* read in any response headers */
- if (pa_read_line(c->io, response, sizeof(response), timeout) > 0) {
- const char* token_state = NULL;
- pa_log_debug("Response Line: %s", response);
-
- timeout = 1000;
- pa_xfree(pa_split(response, delimiters, &token_state));
- token = pa_split(response, delimiters, &token_state);
- if (!token || strcmp(token, "200")) {
- pa_xfree(token);
- pa_log("Invalid Response");
- /* TODO: Bail out completely */
- return;
- }
- pa_xfree(token);
-
- /* We want to return the headers? */
- if (!response_headers) {
- /* We have no storage, so just clear out the response. */
- while (pa_read_line(c->io, response, sizeof(response), timeout) > 0){
- pa_log_debug("Response Line: %s", response);
- }
- } else {
- /* TODO: Move header reading into the headerlist. */
- header = NULL;
- buf = pa_strbuf_new();
- while (pa_read_line(c->io, response, sizeof(response), timeout) > 0) {
- pa_log_debug("Response Line: %s", response);
- /* If the first character is a space, it's a continuation header */
- if (header && ' ' == response[0]) {
- /* Add this line to the buffer (sans the space. */
- pa_strbuf_puts(buf, &(response[1]));
- continue;
- }
-
- if (header) {
- /* This is not a continuation header so let's dump the full
- header/value into our proplist */
- pa_headerlist_puts(response_headers, header, pa_strbuf_tostring_free(buf));
- pa_xfree(header);
- buf = pa_strbuf_new();
- }
-
- delimpos = strstr(response, ":");
- if (!delimpos) {
- pa_log("Invalid response header");
- return;
- }
-
- if (strlen(delimpos) > 1) {
- /* Cut our line off so we can copy the header name out */
- *delimpos++ = '\0';
-
- /* Trim the front of any spaces */
- while (' ' == *delimpos)
- ++delimpos;
+ char delimiters[] = ";";
- pa_strbuf_puts(buf, delimpos);
- } else {
- /* Cut our line off so we can copy the header name out */
- *delimpos = '\0';
- }
-
- /* Save the header name */
- header = pa_xstrdup(response);
- }
- /* We will have a header left from our looping itteration, so add it in :) */
- if (header) {
- /* This is not a continuation header so let's dump it into our proplist */
- pa_headerlist_puts(response_headers, header, pa_strbuf_tostring(buf));
- }
- pa_strbuf_free(buf);
- }
- }
+ pa_assert(c);
+ pa_assert(c->response_headers);
/* Deal with a SETUP response */
if (STATE_SETUP == c->state) {
const char* token_state = NULL;
const char* pc = NULL;
- c->session = pa_xstrdup(pa_headerlist_gets(response_headers, "Session"));
- c->transport = pa_xstrdup(pa_headerlist_gets(response_headers, "Transport"));
+ c->session = pa_xstrdup(pa_headerlist_gets(c->response_headers, "Session"));
+ c->transport = pa_xstrdup(pa_headerlist_gets(c->response_headers, "Transport"));
if (!c->session || !c->transport) {
- pa_headerlist_free(response_headers);
+ pa_headerlist_free(c->response_headers);
+ c->response_headers = NULL;
+ pa_log("Invalid SETUP response.");
return;
}
/* Now parse out the server port component of the response. */
- c->port = 0;
- delimiters[0] = ';';
while ((token = pa_split(c->transport, delimiters, &token_state))) {
if ((pc = strstr(token, "="))) {
if (0 == strncmp(token, "server_port", 11)) {
@@ -336,33 +201,117 @@ static void io_callback(PA_GCC_UNUSED pa_iochannel *io, void *userdata) {
}
if (0 == c->port) {
/* Error no server_port in response */
- pa_headerlist_free(response_headers);
+ pa_headerlist_free(c->response_headers);
+ c->response_headers = NULL;
+ pa_log("Invalid SETUP response (no port number).");
return;
}
}
/* Call our callback */
-do_callback:
if (c->callback)
- c->callback(c, c->state, response_headers, c->userdata);
+ c->callback(c, c->state, c->response_headers, c->userdata);
+ pa_headerlist_free(c->response_headers);
+ c->response_headers = NULL;
+}
- if (response_headers)
- pa_headerlist_free(response_headers);
- /*
- if (do_read(u) < 0 || do_write(u) < 0) {
+static void line_callback(pa_ioline *line, const char *s, void *userdata) {
+ char *delimpos;
+ char *s2, *s2p;
- if (u->io) {
- pa_iochannel_free(u->io);
- u->io = NULL;
+ pa_rtsp_context *c = userdata;
+ pa_assert(line);
+ pa_assert(c);
+ pa_assert(s);
+
+ s2 = pa_xstrdup(s);
+ /* Trim trailing carriage returns */
+ s2p = s2 + strlen(s2) - 1;
+ while (s2p >= s2 && '\r' == *s2p) {
+ *s2p = '\0';
+ s2p -= 1;
+ }
+ if (c->waiting && 0 == strcmp("RTSP/1.0 200 OK", s2)) {
+ c->waiting = 0;
+ pa_assert(!c->response_headers);
+ c->response_headers = pa_headerlist_new();
+ goto exit;
+ }
+ if (c->waiting) {
+ pa_log_warn("Unexpected response: %s", s2);
+ goto exit;;
+ }
+ if (!strlen(s2)) {
+ /* End of headers */
+ /* We will have a header left from our looping itteration, so add it in :) */
+ if (c->last_header) {
+ /* This is not a continuation header so let's dump it into our proplist */
+ pa_headerlist_puts(c->response_headers, c->last_header, pa_strbuf_tostring_free(c->header_buffer));
+ pa_xfree(c->last_header);
+ c->last_header = NULL;
+ c->header_buffer= NULL;
}
- pa_module_unload_request(u->module);
+ pa_log_debug("Full response received. Dispatching");
+ headers_read(c);
+ c->waiting = 1;
+ goto exit;
}
- */
+
+ /* Read and parse a header (we know it's not empty) */
+ /* TODO: Move header reading into the headerlist. */
+
+ /* If the first character is a space, it's a continuation header */
+ if (c->last_header && ' ' == s2[0]) {
+ pa_assert(c->header_buffer);
+
+ /* Add this line to the buffer (sans the space. */
+ pa_strbuf_puts(c->header_buffer, &(s2[1]));
+ goto exit;
+ }
+
+ if (c->last_header) {
+ /* This is not a continuation header so let's dump the full
+ header/value into our proplist */
+ pa_headerlist_puts(c->response_headers, c->last_header, pa_strbuf_tostring_free(c->header_buffer));
+ pa_xfree(c->last_header);
+ c->last_header = NULL;
+ c->header_buffer = NULL;
+ }
+
+ delimpos = strstr(s2, ":");
+ if (!delimpos) {
+ pa_log_warn("Unexpected response when expecting header: %s", s);
+ goto exit;
+ }
+
+ pa_assert(!c->header_buffer);
+ pa_assert(!c->last_header);
+
+ c->header_buffer = pa_strbuf_new();
+ if (strlen(delimpos) > 1) {
+ /* Cut our line off so we can copy the header name out */
+ *delimpos++ = '\0';
+
+ /* Trim the front of any spaces */
+ while (' ' == *delimpos)
+ ++delimpos;
+
+ pa_strbuf_puts(c->header_buffer, delimpos);
+ } else {
+ /* Cut our line off so we can copy the header name out */
+ *delimpos = '\0';
+ }
+
+ /* Save the header name */
+ c->last_header = pa_xstrdup(s2);
+ exit:
+ pa_xfree(s2);
}
+
static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
pa_rtsp_context *c = userdata;
union {
@@ -385,7 +334,9 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata
}
pa_assert(!c->io);
c->io = io;
- pa_iochannel_set_callback(c->io, io_callback, c);
+
+ c->ioline = pa_ioline_new(io);
+ pa_ioline_set_callback(c->ioline, line_callback, c);
/* Get the local IP address for use externally */
if (0 == getsockname(pa_iochannel_get_recv_fd(io), &sa.sa, &sa_len)) {
@@ -401,6 +352,11 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata
c->localip = pa_xstrdup(res);
}
pa_log_debug("Established RTSP connection from local ip %s", c->localip);
+
+ c->waiting = 1;
+ c->state = STATE_CONNECT;
+ if (c->callback)
+ c->callback(c, c->state, NULL, c->userdata);
}
int pa_rtsp_connect(pa_rtsp_context *c, pa_mainloop_api *mainloop, const char* hostname, uint16_t port) {