diff options
| author | Colin Guthrie <pulse@colin.guthr.ie> | 2008-05-07 00:35:10 +0000 | 
|---|---|---|
| committer | Colin Guthrie <pulse@colin.guthr.ie> | 2008-10-08 20:32:07 +0100 | 
| commit | 6510d97315b9bdf7b1afc204c3dca0a2b0a3a528 (patch) | |
| tree | 5ae6076d53b63e6474b57dbfbf26f147b1d39f47 | |
| parent | 22e299ad3e16d1a2636653a7be9d625ecdc23802 (diff) | |
Use a more stateful response parser.
This makes things fully asyncronous.
Some of the continuation headerlist stuff could be moved to headerlist for neatness, but this is OK for now.
git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/coling@2373 fefdeb5f-60dc-0310-8127-8f9354f1896f
| -rw-r--r-- | src/modules/rtp/rtsp.c | 320 | 
1 files changed, 138 insertions, 182 deletions
diff --git a/src/modules/rtp/rtsp.c b/src/modules/rtp/rtsp.c index 4f2411ab..44cd80b9 100644 --- a/src/modules/rtp/rtsp.c +++ b/src/modules/rtp/rtsp.c @@ -46,84 +46,35 @@  #include <pulsecore/macro.h>  #include <pulsecore/strbuf.h>  #include <pulsecore/poll.h> +#include <pulsecore/ioline.h>  #include "rtsp.h"  struct pa_rtsp_context {      pa_socket_client *sc;      pa_iochannel *io; -    pa_rtsp_cb_t callback; -    void* userdata; -    const char* useragent; -    pa_headerlist* headers; -    char* localip; -    char* url; -    uint32_t port; -    uint32_t cseq; -    char* session; -    char* transport; -    pa_rtsp_state state; -}; +    pa_ioline *ioline; -/* - * read one line from the file descriptor - * timeout: msec unit, -1 for infinite - * if CR comes then following LF is expected - * returned string in line is always null terminated, maxlen-1 is maximum string length - */ -static int pa_read_line(pa_iochannel* io, char *line, int maxlen, int timeout) -{ -    int i, rval; -    int count; -    int fd; -    char ch; -    struct pollfd pfds; - -    pa_assert(io); -    fd = pa_iochannel_get_recv_fd(io); - -    count = 0; -    *line = 0; -    pfds.events = POLLIN; -    pfds.fd = fd; - -    for (i=0; i<maxlen; ++i) { -        if (!poll(&pfds, 1, timeout)) -            return 0; - -        rval = read(fd, &ch, 1); - -        if (-1 == rval) { -            if (EAGAIN == errno) -                return 0; -            /*ERRMSG("%s:read error: %s\n", __func__, strerror(errno));*/ -            return -1; -        } - -        if (0 == rval) { -            /*INFMSG("%s:disconnected on the other end\n", __func__);*/ -            return -1; -        } - -        if ('\n' == ch) { -            *line = 0; -            return count; -        } - -        if ('\r' == ch) -            continue; +    pa_rtsp_cb_t callback; -        *line++ = ch; -        count++; +    void *userdata; +    const char *useragent; -        if (count >= maxlen-1) -            break; -    } +    pa_rtsp_state state; +    uint8_t waiting; -    *line = 0; -    return count; -} +    pa_headerlist* headers; +    char *last_header; +    pa_strbuf *header_buffer; +    pa_headerlist* response_headers; +    char *localip; +    char *url; +    uint32_t port; +    uint32_t cseq; +    char *session; +    char *transport; +};  static int pa_rtsp_exec(pa_rtsp_context* c, const char* cmd,                          const char* content_type, const char* content, @@ -172,8 +123,8 @@ static int pa_rtsp_exec(pa_rtsp_context* c, const char* cmd,      /* Our packet is created... now we can send it :) */      hdrs = pa_strbuf_tostring_free(buf); -    pa_log_debug("Submitting request:"); -    pa_log_debug(hdrs); +    /*pa_log_debug("Submitting request:"); +    pa_log_debug(hdrs);*/      l = pa_iochannel_write(c->io, hdrs, strlen(hdrs));      pa_xfree(hdrs); @@ -205,125 +156,39 @@ void pa_rtsp_context_free(pa_rtsp_context* c) {          pa_xfree(c->localip);          pa_xfree(c->session);          pa_xfree(c->transport); +        pa_xfree(c->last_header); +        if (c->header_buffer) +            pa_strbuf_free(c->header_buffer); +        if (c->response_headers) +            pa_headerlist_free(c->response_headers);          pa_headerlist_free(c->headers);      }      pa_xfree(c);  } -static void io_callback(PA_GCC_UNUSED pa_iochannel *io, void *userdata) { -    pa_strbuf* buf; -    pa_headerlist* response_headers = NULL; -    char response[1024]; -    int timeout; +static void headers_read(pa_rtsp_context *c) {      char* token; -    char* header; -    char* delimpos; -    char delimiters[] = " "; -    pa_rtsp_context *c = userdata; -    pa_assert(c); -    pa_assert(c->io == io); - -    if (!pa_iochannel_is_readable(c->io)) { -        if (STATE_SETUP == c->state || STATE_ANNOUNCE == c->state) return; -        goto do_callback; -    } - -    /* TODO: convert this to a pa_ioline based reader */ -    if (STATE_SETUP == c->state || STATE_ANNOUNCE == c->state) { -        response_headers = pa_headerlist_new(); -    } -    timeout = 5000; -    /* read in any response headers */ -    if (pa_read_line(c->io, response, sizeof(response), timeout) > 0) { -        const char* token_state = NULL; -        pa_log_debug("Response Line: %s", response); - -        timeout = 1000; -        pa_xfree(pa_split(response, delimiters, &token_state)); -        token = pa_split(response, delimiters, &token_state); -        if (!token || strcmp(token, "200")) { -            pa_xfree(token); -            pa_log("Invalid Response"); -            /* TODO: Bail out completely */ -            return; -        } -        pa_xfree(token); - -        /* We want to return the headers? */ -        if (!response_headers) { -            /* We have no storage, so just clear out the response. */ -            while (pa_read_line(c->io, response, sizeof(response), timeout) > 0){ -                pa_log_debug("Response Line: %s", response); -            } -        } else { -            /* TODO: Move header reading into the headerlist. */ -            header = NULL; -            buf = pa_strbuf_new(); -            while (pa_read_line(c->io, response, sizeof(response), timeout) > 0) { -                pa_log_debug("Response Line: %s", response); -                /* If the first character is a space, it's a continuation header */ -                if (header && ' ' == response[0]) { -                    /* Add this line to the buffer (sans the space. */ -                    pa_strbuf_puts(buf, &(response[1])); -                    continue; -                } - -                if (header) { -                    /* This is not a continuation header so let's dump the full -                      header/value into our proplist */ -                    pa_headerlist_puts(response_headers, header, pa_strbuf_tostring_free(buf)); -                    pa_xfree(header); -                    buf = pa_strbuf_new(); -                } - -                delimpos = strstr(response, ":"); -                if (!delimpos) { -                    pa_log("Invalid response header"); -                    return; -                } - -                if (strlen(delimpos) > 1) { -                    /* Cut our line off so we can copy the header name out */ -                    *delimpos++ = '\0'; - -                    /* Trim the front of any spaces */ -                    while (' ' == *delimpos) -                        ++delimpos; +    char delimiters[] = ";"; -                    pa_strbuf_puts(buf, delimpos); -                } else { -                    /* Cut our line off so we can copy the header name out */ -                    *delimpos = '\0'; -                } - -                /* Save the header name */ -                header = pa_xstrdup(response); -            } -            /* We will have a header left from our looping itteration, so add it in :) */ -            if (header) { -                /* This is not a continuation header so let's dump it into our proplist */ -                pa_headerlist_puts(response_headers, header, pa_strbuf_tostring(buf)); -            } -            pa_strbuf_free(buf); -        } -    } +    pa_assert(c); +    pa_assert(c->response_headers);      /* Deal with a SETUP response */      if (STATE_SETUP == c->state) {          const char* token_state = NULL;          const char* pc = NULL; -        c->session = pa_xstrdup(pa_headerlist_gets(response_headers, "Session")); -        c->transport = pa_xstrdup(pa_headerlist_gets(response_headers, "Transport")); +        c->session = pa_xstrdup(pa_headerlist_gets(c->response_headers, "Session")); +        c->transport = pa_xstrdup(pa_headerlist_gets(c->response_headers, "Transport"));          if (!c->session || !c->transport) { -            pa_headerlist_free(response_headers); +            pa_headerlist_free(c->response_headers); +            c->response_headers = NULL; +            pa_log("Invalid SETUP response.");              return;          }          /* Now parse out the server port component of the response. */ -        c->port = 0; -        delimiters[0] = ';';          while ((token = pa_split(c->transport, delimiters, &token_state))) {              if ((pc = strstr(token, "="))) {                  if (0 == strncmp(token, "server_port", 11)) { @@ -336,33 +201,117 @@ static void io_callback(PA_GCC_UNUSED pa_iochannel *io, void *userdata) {          }          if (0 == c->port) {              /* Error no server_port in response */ -            pa_headerlist_free(response_headers); +            pa_headerlist_free(c->response_headers); +            c->response_headers = NULL; +            pa_log("Invalid SETUP response (no port number).");              return;          }      }      /* Call our callback */ -do_callback:      if (c->callback) -        c->callback(c, c->state, response_headers, c->userdata); +        c->callback(c, c->state, c->response_headers, c->userdata); +    pa_headerlist_free(c->response_headers); +    c->response_headers = NULL; +} -    if (response_headers) -        pa_headerlist_free(response_headers); -    /* -    if (do_read(u) < 0 || do_write(u) < 0) { +static void line_callback(pa_ioline *line, const char *s, void *userdata) { +    char *delimpos; +    char *s2, *s2p; -        if (u->io) { -            pa_iochannel_free(u->io); -            u->io = NULL; +    pa_rtsp_context *c = userdata; +    pa_assert(line); +    pa_assert(c); +    pa_assert(s); + +    s2 = pa_xstrdup(s); +    /* Trim trailing carriage returns */ +    s2p = s2 + strlen(s2) - 1; +    while (s2p >= s2 && '\r' == *s2p) { +        *s2p = '\0'; +        s2p -= 1; +    } +    if (c->waiting && 0 == strcmp("RTSP/1.0 200 OK", s2)) { +        c->waiting = 0; +        pa_assert(!c->response_headers); +        c->response_headers = pa_headerlist_new(); +        goto exit; +    } +    if (c->waiting) { +        pa_log_warn("Unexpected response: %s", s2); +        goto exit;; +    } +    if (!strlen(s2)) { +        /* End of headers */ +        /* We will have a header left from our looping itteration, so add it in :) */ +        if (c->last_header) { +            /* This is not a continuation header so let's dump it into our proplist */ +            pa_headerlist_puts(c->response_headers, c->last_header, pa_strbuf_tostring_free(c->header_buffer)); +            pa_xfree(c->last_header); +            c->last_header = NULL; +            c->header_buffer= NULL;          } -       pa_module_unload_request(u->module); +        pa_log_debug("Full response received. Dispatching"); +        headers_read(c); +        c->waiting = 1; +        goto exit;      } -    */ + +    /* Read and parse a header (we know it's not empty) */ +    /* TODO: Move header reading into the headerlist. */ + +    /* If the first character is a space, it's a continuation header */ +    if (c->last_header && ' ' == s2[0]) { +        pa_assert(c->header_buffer); + +        /* Add this line to the buffer (sans the space. */ +        pa_strbuf_puts(c->header_buffer, &(s2[1])); +        goto exit; +    } + +    if (c->last_header) { +        /* This is not a continuation header so let's dump the full +          header/value into our proplist */ +        pa_headerlist_puts(c->response_headers, c->last_header, pa_strbuf_tostring_free(c->header_buffer)); +        pa_xfree(c->last_header); +        c->last_header = NULL; +        c->header_buffer = NULL; +    } + +    delimpos = strstr(s2, ":"); +    if (!delimpos) { +        pa_log_warn("Unexpected response when expecting header: %s", s); +        goto exit; +    } + +    pa_assert(!c->header_buffer); +    pa_assert(!c->last_header); + +    c->header_buffer = pa_strbuf_new(); +    if (strlen(delimpos) > 1) { +        /* Cut our line off so we can copy the header name out */ +        *delimpos++ = '\0'; + +        /* Trim the front of any spaces */ +        while (' ' == *delimpos) +            ++delimpos; + +        pa_strbuf_puts(c->header_buffer, delimpos); +    } else { +        /* Cut our line off so we can copy the header name out */ +        *delimpos = '\0'; +    } + +    /* Save the header name */ +    c->last_header = pa_xstrdup(s2); +  exit: +    pa_xfree(s2);  } +  static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {      pa_rtsp_context *c = userdata;      union { @@ -385,7 +334,9 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata      }      pa_assert(!c->io);      c->io = io; -    pa_iochannel_set_callback(c->io, io_callback, c); + +    c->ioline = pa_ioline_new(io); +    pa_ioline_set_callback(c->ioline, line_callback, c);      /* Get the local IP address for use externally */      if (0 == getsockname(pa_iochannel_get_recv_fd(io), &sa.sa, &sa_len)) { @@ -401,6 +352,11 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata              c->localip = pa_xstrdup(res);      }      pa_log_debug("Established RTSP connection from local ip %s", c->localip); + +    c->waiting = 1; +    c->state = STATE_CONNECT; +    if (c->callback) +        c->callback(c, c->state, NULL, c->userdata);  }  int pa_rtsp_connect(pa_rtsp_context *c, pa_mainloop_api *mainloop, const char* hostname, uint16_t port) {  | 
