diff options
Diffstat (limited to 'src/modules/module-pipe-source.c')
| -rw-r--r-- | src/modules/module-pipe-source.c | 103 | 
1 files changed, 47 insertions, 56 deletions
| diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c index 2209d1ee..6dc94648 100644 --- a/src/modules/module-pipe-source.c +++ b/src/modules/module-pipe-source.c @@ -45,6 +45,7 @@  #include <pulsecore/log.h>  #include <pulsecore/thread.h>  #include <pulsecore/thread-mq.h> +#include <pulsecore/rtpoll.h>  #include "module-pipe-source-symdef.h" @@ -66,13 +67,17 @@ struct userdata {      pa_core *core;      pa_module *module;      pa_source *source; +      pa_thread *thread;      pa_thread_mq thread_mq; -     +    pa_rtpoll *rtpoll; +      char *filename;      int fd;      pa_memchunk memchunk; + +    pa_rtpoll_item *rtpoll_item;  };  static const char* const valid_modargs[] = { @@ -86,14 +91,7 @@ static const char* const valid_modargs[] = {  };  static void thread_func(void *userdata) { -    enum { -        POLLFD_ASYNCQ, -        POLLFD_FIFO, -        POLLFD_MAX, -    }; -          struct userdata *u = userdata; -    struct pollfd pollfd[POLLFD_MAX];      int read_type = 0;      pa_assert(u); @@ -101,40 +99,18 @@ static void thread_func(void *userdata) {      pa_log_debug("Thread starting up");      pa_thread_mq_install(&u->thread_mq); - -    memset(&pollfd, 0, sizeof(pollfd)); +    pa_rtpoll_install(u->rtpoll); -    pollfd[POLLFD_ASYNCQ].fd = pa_asyncmsgq_get_fd(u->thread_mq.inq); -    pollfd[POLLFD_ASYNCQ].events = POLLIN; -    pollfd[POLLFD_FIFO].fd = u->fd; -      for (;;) { -        pa_msgobject *object; -        int code; -        void *data; -        pa_memchunk chunk; -        int r; -        int64_t offset; - -        /* Check whether there is a message for us to process */ -        if (pa_asyncmsgq_get(u->thread_mq.inq, &object, &code, &data, &offset, &chunk, 0) == 0) { -            int ret; - -            if (!object && code == PA_MESSAGE_SHUTDOWN) { -                pa_asyncmsgq_done(u->thread_mq.inq, 0); -                goto finish; -            } - -            ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); -            pa_asyncmsgq_done(u->thread_mq.inq, ret); -            continue; -        } +        int ret; +        struct pollfd *pollfd; +        pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); +                  /* Try to read some data and pass it on to the source driver */ - -        if (u->source->thread_info.state == PA_SOURCE_RUNNING && pollfd[POLLFD_FIFO].revents) { -            void *p; +        if (u->source->thread_info.state == PA_SOURCE_RUNNING && pollfd->revents) {              ssize_t l; +            void *p;              if (!u->memchunk.memblock) {                  u->memchunk.memblock = pa_memblock_new(u->core->mempool, PIPE_BUF); @@ -169,38 +145,35 @@ static void thread_func(void *userdata) {                      pa_memchunk_reset(&u->memchunk);                  } -                pollfd[POLLFD_FIFO].revents = 0; -                continue; +                pollfd->revents = 0;              }          } -        pollfd[POLLFD_FIFO].events = u->source->thread_info.state == PA_SOURCE_RUNNING ? POLLIN : 0; - -        /* Hmm, nothing to do. Let's sleep */ - -        if (pa_asyncmsgq_before_poll(u->thread_mq.inq) < 0) +        /* Now give the source outputs some to time to process their data */ +        if ((ret = pa_source_process_outputs(u->source)) < 0) +            goto fail; +        if (ret > 0)              continue; -/*         pa_log("polling for %i", pollfd[POLLFD_FIFO].events); */ -        r = poll(pollfd, POLLFD_MAX, -1); -/*         pa_log("polling got %i (r=%i) %i", r > 0 ? pollfd[POLLFD_FIFO].revents : 0, r, r > 0 ? pollfd[POLLFD_ASYNCQ].revents: 0); */ - -        pa_asyncmsgq_after_poll(u->thread_mq.inq); +        /* Check whether there is a message for us to process */ +        if ((ret = pa_thread_mq_process(&u->thread_mq) < 0)) +            goto finish; +        if (ret > 0) +            continue; -        if (r < 0) { -            if (errno == EINTR) -                continue; +        /* Hmm, nothing to do. Let's sleep */ +        pollfd->events = u->source->thread_info.state == PA_SOURCE_RUNNING ? POLLIN : 0; +        if (pa_rtpoll_run(u->rtpoll) < 0) {              pa_log("poll() failed: %s", pa_cstrerror(errno));              goto fail;          } -        if (pollfd[POLLFD_FIFO].revents & ~POLLIN) { +        pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); +        if (pollfd->revents & ~POLLIN) {              pa_log("FIFO shutdown.");              goto fail;          } - -        pa_assert((pollfd[POLLFD_ASYNCQ].revents & ~POLLIN) == 0);      }  fail: @@ -220,6 +193,7 @@ int pa__init(pa_module*m) {      pa_channel_map map;      pa_modargs *ma;      char *t; +    struct pollfd *pollfd;      pa_assert(m); @@ -240,6 +214,8 @@ int pa__init(pa_module*m) {      m->userdata = u;      pa_memchunk_reset(&u->memchunk);      pa_thread_mq_init(&u->thread_mq, m->core->mainloop); +    u->rtpoll = pa_rtpoll_new(); +    pa_rtpoll_item_new_asyncmsgq(u->rtpoll, u->thread_mq.inq);      u->filename = pa_xstrdup(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME)); @@ -268,17 +244,26 @@ int pa__init(pa_module*m) {      }      u->source->userdata = u; +    u->source->flags = 0;      pa_source_set_module(u->source, m);      pa_source_set_asyncmsgq(u->source, u->thread_mq.inq); +    pa_source_set_rtpoll(u->source, u->rtpoll);      pa_source_set_description(u->source, t = pa_sprintf_malloc("Unix FIFO source '%s'", u->filename));      pa_xfree(t); +    u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, 1); +    pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); +    pollfd->fd = u->fd; +    pollfd->events = pollfd->revents = 0; +          if (!(u->thread = pa_thread_new(thread_func, u))) {          pa_log("Failed to create thread.");          goto fail;      } +    pa_source_put(u->source); +      pa_modargs_free(ma);      return 0; @@ -301,7 +286,7 @@ void pa__done(pa_module*m) {          return;      if (u->source) -        pa_source_disconnect(u->source); +        pa_source_unlink(u->source);      if (u->thread) {          pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL); @@ -316,6 +301,12 @@ void pa__done(pa_module*m) {      if (u->memchunk.memblock)          pa_memblock_unref(u->memchunk.memblock); +    if (u->rtpoll_item) +        pa_rtpoll_item_free(u->rtpoll_item); +     +    if (u->rtpoll) +        pa_rtpoll_free(u->rtpoll); +          if (u->filename) {          unlink(u->filename);          pa_xfree(u->filename); | 
