From 6510d97315b9bdf7b1afc204c3dca0a2b0a3a528 Mon Sep 17 00:00:00 2001 From: Colin Guthrie Date: Wed, 7 May 2008 00:35:10 +0000 Subject: Use a more stateful response parser. This makes things fully asyncronous. Some of the continuation headerlist stuff could be moved to headerlist for neatness, but this is OK for now. git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/coling@2373 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/modules/rtp/rtsp.c | 320 +++++++++++++++++++++---------------------------- 1 file changed, 138 insertions(+), 182 deletions(-) (limited to 'src') diff --git a/src/modules/rtp/rtsp.c b/src/modules/rtp/rtsp.c index 4f2411ab..44cd80b9 100644 --- a/src/modules/rtp/rtsp.c +++ b/src/modules/rtp/rtsp.c @@ -46,84 +46,35 @@ #include #include #include +#include #include "rtsp.h" struct pa_rtsp_context { pa_socket_client *sc; pa_iochannel *io; - pa_rtsp_cb_t callback; - void* userdata; - const char* useragent; - pa_headerlist* headers; - char* localip; - char* url; - uint32_t port; - uint32_t cseq; - char* session; - char* transport; - pa_rtsp_state state; -}; + pa_ioline *ioline; -/* - * read one line from the file descriptor - * timeout: msec unit, -1 for infinite - * if CR comes then following LF is expected - * returned string in line is always null terminated, maxlen-1 is maximum string length - */ -static int pa_read_line(pa_iochannel* io, char *line, int maxlen, int timeout) -{ - int i, rval; - int count; - int fd; - char ch; - struct pollfd pfds; - - pa_assert(io); - fd = pa_iochannel_get_recv_fd(io); - - count = 0; - *line = 0; - pfds.events = POLLIN; - pfds.fd = fd; - - for (i=0; i= maxlen-1) - break; - } + pa_rtsp_state state; + uint8_t waiting; - *line = 0; - return count; -} + pa_headerlist* headers; + char *last_header; + pa_strbuf *header_buffer; + pa_headerlist* response_headers; + char *localip; + char *url; + uint32_t port; + uint32_t cseq; + char *session; + char *transport; +}; static int pa_rtsp_exec(pa_rtsp_context* c, const char* cmd, const char* content_type, const char* content, @@ -172,8 +123,8 @@ static int pa_rtsp_exec(pa_rtsp_context* c, const char* cmd, /* Our packet is created... now we can send it :) */ hdrs = pa_strbuf_tostring_free(buf); - pa_log_debug("Submitting request:"); - pa_log_debug(hdrs); + /*pa_log_debug("Submitting request:"); + pa_log_debug(hdrs);*/ l = pa_iochannel_write(c->io, hdrs, strlen(hdrs)); pa_xfree(hdrs); @@ -205,125 +156,39 @@ void pa_rtsp_context_free(pa_rtsp_context* c) { pa_xfree(c->localip); pa_xfree(c->session); pa_xfree(c->transport); + pa_xfree(c->last_header); + if (c->header_buffer) + pa_strbuf_free(c->header_buffer); + if (c->response_headers) + pa_headerlist_free(c->response_headers); pa_headerlist_free(c->headers); } pa_xfree(c); } -static void io_callback(PA_GCC_UNUSED pa_iochannel *io, void *userdata) { - pa_strbuf* buf; - pa_headerlist* response_headers = NULL; - char response[1024]; - int timeout; +static void headers_read(pa_rtsp_context *c) { char* token; - char* header; - char* delimpos; - char delimiters[] = " "; - pa_rtsp_context *c = userdata; - pa_assert(c); - pa_assert(c->io == io); - - if (!pa_iochannel_is_readable(c->io)) { - if (STATE_SETUP == c->state || STATE_ANNOUNCE == c->state) return; - goto do_callback; - } - - /* TODO: convert this to a pa_ioline based reader */ - if (STATE_SETUP == c->state || STATE_ANNOUNCE == c->state) { - response_headers = pa_headerlist_new(); - } - timeout = 5000; - /* read in any response headers */ - if (pa_read_line(c->io, response, sizeof(response), timeout) > 0) { - const char* token_state = NULL; - pa_log_debug("Response Line: %s", response); - - timeout = 1000; - pa_xfree(pa_split(response, delimiters, &token_state)); - token = pa_split(response, delimiters, &token_state); - if (!token || strcmp(token, "200")) { - pa_xfree(token); - pa_log("Invalid Response"); - /* TODO: Bail out completely */ - return; - } - pa_xfree(token); - - /* We want to return the headers? */ - if (!response_headers) { - /* We have no storage, so just clear out the response. */ - while (pa_read_line(c->io, response, sizeof(response), timeout) > 0){ - pa_log_debug("Response Line: %s", response); - } - } else { - /* TODO: Move header reading into the headerlist. */ - header = NULL; - buf = pa_strbuf_new(); - while (pa_read_line(c->io, response, sizeof(response), timeout) > 0) { - pa_log_debug("Response Line: %s", response); - /* If the first character is a space, it's a continuation header */ - if (header && ' ' == response[0]) { - /* Add this line to the buffer (sans the space. */ - pa_strbuf_puts(buf, &(response[1])); - continue; - } - - if (header) { - /* This is not a continuation header so let's dump the full - header/value into our proplist */ - pa_headerlist_puts(response_headers, header, pa_strbuf_tostring_free(buf)); - pa_xfree(header); - buf = pa_strbuf_new(); - } - - delimpos = strstr(response, ":"); - if (!delimpos) { - pa_log("Invalid response header"); - return; - } - - if (strlen(delimpos) > 1) { - /* Cut our line off so we can copy the header name out */ - *delimpos++ = '\0'; - - /* Trim the front of any spaces */ - while (' ' == *delimpos) - ++delimpos; + char delimiters[] = ";"; - pa_strbuf_puts(buf, delimpos); - } else { - /* Cut our line off so we can copy the header name out */ - *delimpos = '\0'; - } - - /* Save the header name */ - header = pa_xstrdup(response); - } - /* We will have a header left from our looping itteration, so add it in :) */ - if (header) { - /* This is not a continuation header so let's dump it into our proplist */ - pa_headerlist_puts(response_headers, header, pa_strbuf_tostring(buf)); - } - pa_strbuf_free(buf); - } - } + pa_assert(c); + pa_assert(c->response_headers); /* Deal with a SETUP response */ if (STATE_SETUP == c->state) { const char* token_state = NULL; const char* pc = NULL; - c->session = pa_xstrdup(pa_headerlist_gets(response_headers, "Session")); - c->transport = pa_xstrdup(pa_headerlist_gets(response_headers, "Transport")); + c->session = pa_xstrdup(pa_headerlist_gets(c->response_headers, "Session")); + c->transport = pa_xstrdup(pa_headerlist_gets(c->response_headers, "Transport")); if (!c->session || !c->transport) { - pa_headerlist_free(response_headers); + pa_headerlist_free(c->response_headers); + c->response_headers = NULL; + pa_log("Invalid SETUP response."); return; } /* Now parse out the server port component of the response. */ - c->port = 0; - delimiters[0] = ';'; while ((token = pa_split(c->transport, delimiters, &token_state))) { if ((pc = strstr(token, "="))) { if (0 == strncmp(token, "server_port", 11)) { @@ -336,33 +201,117 @@ static void io_callback(PA_GCC_UNUSED pa_iochannel *io, void *userdata) { } if (0 == c->port) { /* Error no server_port in response */ - pa_headerlist_free(response_headers); + pa_headerlist_free(c->response_headers); + c->response_headers = NULL; + pa_log("Invalid SETUP response (no port number)."); return; } } /* Call our callback */ -do_callback: if (c->callback) - c->callback(c, c->state, response_headers, c->userdata); + c->callback(c, c->state, c->response_headers, c->userdata); + pa_headerlist_free(c->response_headers); + c->response_headers = NULL; +} - if (response_headers) - pa_headerlist_free(response_headers); - /* - if (do_read(u) < 0 || do_write(u) < 0) { +static void line_callback(pa_ioline *line, const char *s, void *userdata) { + char *delimpos; + char *s2, *s2p; - if (u->io) { - pa_iochannel_free(u->io); - u->io = NULL; + pa_rtsp_context *c = userdata; + pa_assert(line); + pa_assert(c); + pa_assert(s); + + s2 = pa_xstrdup(s); + /* Trim trailing carriage returns */ + s2p = s2 + strlen(s2) - 1; + while (s2p >= s2 && '\r' == *s2p) { + *s2p = '\0'; + s2p -= 1; + } + if (c->waiting && 0 == strcmp("RTSP/1.0 200 OK", s2)) { + c->waiting = 0; + pa_assert(!c->response_headers); + c->response_headers = pa_headerlist_new(); + goto exit; + } + if (c->waiting) { + pa_log_warn("Unexpected response: %s", s2); + goto exit;; + } + if (!strlen(s2)) { + /* End of headers */ + /* We will have a header left from our looping itteration, so add it in :) */ + if (c->last_header) { + /* This is not a continuation header so let's dump it into our proplist */ + pa_headerlist_puts(c->response_headers, c->last_header, pa_strbuf_tostring_free(c->header_buffer)); + pa_xfree(c->last_header); + c->last_header = NULL; + c->header_buffer= NULL; } - pa_module_unload_request(u->module); + pa_log_debug("Full response received. Dispatching"); + headers_read(c); + c->waiting = 1; + goto exit; } - */ + + /* Read and parse a header (we know it's not empty) */ + /* TODO: Move header reading into the headerlist. */ + + /* If the first character is a space, it's a continuation header */ + if (c->last_header && ' ' == s2[0]) { + pa_assert(c->header_buffer); + + /* Add this line to the buffer (sans the space. */ + pa_strbuf_puts(c->header_buffer, &(s2[1])); + goto exit; + } + + if (c->last_header) { + /* This is not a continuation header so let's dump the full + header/value into our proplist */ + pa_headerlist_puts(c->response_headers, c->last_header, pa_strbuf_tostring_free(c->header_buffer)); + pa_xfree(c->last_header); + c->last_header = NULL; + c->header_buffer = NULL; + } + + delimpos = strstr(s2, ":"); + if (!delimpos) { + pa_log_warn("Unexpected response when expecting header: %s", s); + goto exit; + } + + pa_assert(!c->header_buffer); + pa_assert(!c->last_header); + + c->header_buffer = pa_strbuf_new(); + if (strlen(delimpos) > 1) { + /* Cut our line off so we can copy the header name out */ + *delimpos++ = '\0'; + + /* Trim the front of any spaces */ + while (' ' == *delimpos) + ++delimpos; + + pa_strbuf_puts(c->header_buffer, delimpos); + } else { + /* Cut our line off so we can copy the header name out */ + *delimpos = '\0'; + } + + /* Save the header name */ + c->last_header = pa_xstrdup(s2); + exit: + pa_xfree(s2); } + static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) { pa_rtsp_context *c = userdata; union { @@ -385,7 +334,9 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata } pa_assert(!c->io); c->io = io; - pa_iochannel_set_callback(c->io, io_callback, c); + + c->ioline = pa_ioline_new(io); + pa_ioline_set_callback(c->ioline, line_callback, c); /* Get the local IP address for use externally */ if (0 == getsockname(pa_iochannel_get_recv_fd(io), &sa.sa, &sa_len)) { @@ -401,6 +352,11 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata c->localip = pa_xstrdup(res); } pa_log_debug("Established RTSP connection from local ip %s", c->localip); + + c->waiting = 1; + c->state = STATE_CONNECT; + if (c->callback) + c->callback(c, c->state, NULL, c->userdata); } int pa_rtsp_connect(pa_rtsp_context *c, pa_mainloop_api *mainloop, const char* hostname, uint16_t port) { -- cgit