diff options
| author | Lennart Poettering <lennart@poettering.net> | 2007-08-22 22:27:53 +0000 | 
|---|---|---|
| committer | Lennart Poettering <lennart@poettering.net> | 2007-08-22 22:27:53 +0000 | 
| commit | 9d381599be89f5cb99da60f83e086e476f50a72f (patch) | |
| tree | 6916e1eedf869248ac2c6f767c686e39e2a85be8 /src | |
| parent | 1bfa1802d48149cbd699e36cf80989b9c062341b (diff) | |
port remaining sinks to pa_rtpoll
git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1705 fefdeb5f-60dc-0310-8127-8f9354f1896f
Diffstat (limited to 'src')
| -rw-r--r-- | src/modules/module-alsa-source.c | 295 | ||||
| -rw-r--r-- | src/modules/module-null-sink.c | 88 | ||||
| -rw-r--r-- | src/modules/module-oss.c | 170 | ||||
| -rw-r--r-- | src/modules/module-pipe-sink.c | 93 | 
4 files changed, 306 insertions, 340 deletions
diff --git a/src/modules/module-alsa-source.c b/src/modules/module-alsa-source.c index 58e7cb4a..6ea99ec7 100644 --- a/src/modules/module-alsa-source.c +++ b/src/modules/module-alsa-source.c @@ -29,15 +29,10 @@  #include <assert.h>  #include <stdio.h> -#ifdef HAVE_SYS_POLL_H -#include <sys/poll.h> -#else -#include "poll.h" -#endif -  #include <asoundlib.h>  #include <pulse/xmalloc.h> +#include <pulse/util.h>  #include <pulsecore/core-error.h>  #include <pulsecore/core.h> @@ -52,6 +47,7 @@  #include <pulsecore/thread.h>  #include <pulsecore/core-error.h>  #include <pulsecore/thread-mq.h> +#include <pulsecore/rtpoll.h>  #include "alsa-util.h"  #include "module-alsa-source-symdef.h" @@ -76,8 +72,10 @@ struct userdata {      pa_core *core;      pa_module *module;      pa_source *source; +          pa_thread *thread;      pa_thread_mq thread_mq; +    pa_rtpoll *rtpoll;      snd_pcm_t *pcm_handle; @@ -93,13 +91,7 @@ struct userdata {      int use_mmap; -    struct pollfd *pollfd; -    int n_alsa_fds; -}; - -enum { -    POLLFD_ASYNCQ, -    POLLFD_ALSA_BASE +    pa_rtpoll_item *alsa_rtpoll_item;  };  static const char* const valid_modargs[] = { @@ -116,16 +108,16 @@ static const char* const valid_modargs[] = {  };  static int mmap_read(struct userdata *u) { -    snd_pcm_sframes_t n; -    int err; -    const snd_pcm_channel_area_t *areas; -    snd_pcm_uframes_t offset, frames;      int work_done = 0;      pa_assert(u); -    pa_assert(u->source); +    pa_source_assert_ref(u->source);      for (;;) { +        snd_pcm_sframes_t n; +        int err; +        const snd_pcm_channel_area_t *areas; +        snd_pcm_uframes_t offset, frames;          pa_memchunk chunk;          void *p; @@ -207,6 +199,73 @@ static int mmap_read(struct userdata *u) {      }  } +static int unix_read(struct userdata *u) { +    snd_pcm_status_t *status; +    int work_done = 0; + +    snd_pcm_status_alloca(&status); + +    pa_assert(u); +    pa_source_assert_ref(u->source); + +    for (;;) { +        void *p; +        snd_pcm_sframes_t t; +        ssize_t l; +        int err; +        pa_memchunk chunk; +         +        if ((err = snd_pcm_status(u->pcm_handle, status)) < 0) { +            pa_log("Failed to query DSP status data: %s", snd_strerror(t)); +            return -1; +        } + +        if (snd_pcm_status_get_avail_max(status)*u->frame_size >= u->hwbuf_size) +            pa_log_debug("Buffer overrun!"); +                     +        l = snd_pcm_status_get_avail(status) * u->frame_size; + +        if (l <= 0) +            return work_done; +                     +        chunk.memblock = pa_memblock_new(u->core->mempool, l); + +        p = pa_memblock_acquire(chunk.memblock); +        t = snd_pcm_readi(u->pcm_handle, (uint8_t*) p, l / u->frame_size); +        pa_memblock_release(chunk.memblock); +         +/*                     pa_log("wrote %i bytes of %u (%u)", t*u->frame_size, u->memchunk.length, l);   */ +         +        pa_assert(t != 0); +                     +        if (t < 0) { +            pa_memblock_unref(chunk.memblock); +             +            if ((t = snd_pcm_recover(u->pcm_handle, t, 1)) == 0) +                continue; +                         +            if (t == -EAGAIN) { +                pa_log_debug("EAGAIN"); +                return work_done; +            } else { +                pa_log("Failed to read data from DSP: %s", snd_strerror(t)); +                return -1; +            } +        }  +                         +        chunk.index = 0; +        chunk.length = t * u->frame_size; + +        pa_source_post(u->source, &chunk); +        pa_memblock_unref(chunk.memblock); +                     +        work_done = 1; + +        if (t * u->frame_size >= (unsigned) l) +            return work_done; +    } +} +  static pa_usec_t source_get_latency(struct userdata *u) {      pa_usec_t r = 0;      snd_pcm_status_t *status; @@ -216,6 +275,7 @@ static pa_usec_t source_get_latency(struct userdata *u) {      snd_pcm_status_alloca(&status);      pa_assert(u); +    pa_assert(u->pcm_handle);      if ((err = snd_pcm_status(u->pcm_handle, status)) < 0)           pa_log("Failed to get delay: %s", snd_strerror(err)); @@ -230,22 +290,24 @@ static pa_usec_t source_get_latency(struct userdata *u) {  static int build_pollfd(struct userdata *u) {      int err; +    struct pollfd *pollfd; +    int n;      pa_assert(u);      pa_assert(u->pcm_handle); -    if ((u->n_alsa_fds = snd_pcm_poll_descriptors_count(u->pcm_handle)) < 0) { -        pa_log("snd_pcm_poll_descriptors_count() failed: %s", snd_strerror(u->n_alsa_fds)); +    if ((n = snd_pcm_poll_descriptors_count(u->pcm_handle)) < 0) { +        pa_log("snd_pcm_poll_descriptors_count() failed: %s", snd_strerror(n));          return -1;      } -    pa_xfree(u->pollfd); -    u->pollfd = pa_xnew0(struct pollfd, POLLFD_ALSA_BASE + u->n_alsa_fds); - -    u->pollfd[POLLFD_ASYNCQ].fd = pa_asyncmsgq_get_fd(u->thread_mq.inq); -    u->pollfd[POLLFD_ASYNCQ].events = POLLIN; +    if (u->alsa_rtpoll_item) +        pa_rtpoll_item_free(u->alsa_rtpoll_item); -    if ((err = snd_pcm_poll_descriptors(u->pcm_handle, u->pollfd+POLLFD_ALSA_BASE, u->n_alsa_fds)) < 0) { +    u->alsa_rtpoll_item = pa_rtpoll_item_new(u->rtpoll, n); +    pollfd = pa_rtpoll_item_get_pollfd(u->alsa_rtpoll_item, NULL); +     +    if ((err = snd_pcm_poll_descriptors(u->pcm_handle, pollfd, n)) < 0) {          pa_log("snd_pcm_poll_descriptors() failed: %s", snd_strerror(err));          return -1;      } @@ -261,6 +323,11 @@ static int suspend(struct userdata *u) {      snd_pcm_close(u->pcm_handle);      u->pcm_handle = NULL; +    if (u->alsa_rtpoll_item) { +        pa_rtpoll_item_free(u->alsa_rtpoll_item); +        u->alsa_rtpoll_item = NULL; +    } +          pa_log_debug("Device suspended...");      return 0; @@ -505,14 +572,9 @@ static int source_set_mute_cb(pa_source *s) {  }  static void thread_func(void *userdata) { -      struct userdata *u = userdata; -    int err; -    unsigned short revents = 0; -    snd_pcm_status_t *status;      pa_assert(u); -    snd_pcm_status_alloca(&status);      pa_log_debug("Thread starting up"); @@ -520,20 +582,37 @@ static void thread_func(void *userdata) {          pa_make_realtime();      pa_thread_mq_install(&u->thread_mq); +    pa_rtpoll_install(u->rtpoll);      if (build_pollfd(u) < 0)          goto fail; +    snd_pcm_start(u->pcm_handle); +          for (;;) {          pa_msgobject *object;          int code;          void *data; -        int r;          int64_t offset;          pa_memchunk chunk;  /*         pa_log("loop");     */ +        /* Render some data and write it to the dsp */ +        if (PA_SOURCE_OPENED(u->source->thread_info.state)) { +             +            if (u->use_mmap) { +                if (mmap_read(u) < 0) +                    goto fail; + +            } else { +                if (unix_read(u) < 0) +                    goto fail; +            } +        } + +/*         pa_log("loop2"); */ +                  /* Check whether there is a message for us to process */          if (pa_asyncmsgq_get(u->thread_mq.inq, &object, &code, &data, &offset, &chunk, 0) == 0) {              int ret; @@ -550,109 +629,20 @@ static void thread_func(void *userdata) {              continue;          }  -/*         pa_log("loop2"); */ - -        /* Render some data and write it to the dsp */ - -        if (PA_SOURCE_OPENED(u->source->thread_info.state) && (revents & POLLIN)) { -            int work_done = 0; -            pa_assert(u->pcm_handle); - -            if (u->use_mmap) { - -                if ((work_done = mmap_read(u)) < 0) -                    goto fail; - -            } else { - -                for (;;) { -                    void *p; -                    snd_pcm_sframes_t t; -                    ssize_t l; - -                    if ((err = snd_pcm_status(u->pcm_handle, status)) < 0) { -                        pa_log("Failed to query DSP status data: %s", snd_strerror(t)); -                        goto fail; -                    } - -                    if (snd_pcm_status_get_avail_max(status)*u->frame_size >= u->hwbuf_size) -                        pa_log_debug("Buffer overrun!"); -                     -                    l = snd_pcm_status_get_avail(status) * u->frame_size; - -                    if (l <= 0) -                        break; -                     -                    chunk.memblock = pa_memblock_new(u->core->mempool, l); - -                    p = pa_memblock_acquire(chunk.memblock); -                    t = snd_pcm_readi(u->pcm_handle, (uint8_t*) p, l / u->frame_size); -                    pa_memblock_release(chunk.memblock); -                     -/*                     pa_log("wrote %i bytes of %u (%u)", t*u->frame_size, u->memchunk.length, l);   */ -                     -                    pa_assert(t != 0); -                     -                    if (t < 0) { -                        pa_memblock_unref(chunk.memblock); - -                        if ((t = snd_pcm_recover(u->pcm_handle, t, 1)) == 0) -                            continue; -                         -                        if (t == -EAGAIN) { -                            pa_log_debug("EAGAIN"); -                            break; -                        } else { -                            pa_log("Failed to read data from DSP: %s", snd_strerror(t)); -                            goto fail; -                        } -                         -                    }  -                         -                    chunk.index = 0; -                    chunk.length = t * u->frame_size; - -                    pa_source_post(u->source, &chunk); -                    pa_memblock_unref(chunk.memblock); -                     -                    work_done = 1; - -                    if (t * u->frame_size >= (unsigned) l) -                        break; -                }  -            } - -            revents &= ~POLLIN; -             -            if (work_done) -                continue; -        } - -        /* Hmm, nothing to do. Let's sleep */ -        if (pa_asyncmsgq_before_poll(u->thread_mq.inq) < 0) -            continue; - -/*         pa_log("polling for %i", POLLFD_ALSA_BASE + (PA_SOURCE_OPENED(u->source->thread_info.state) ? n_alsa_fds : 0));   */ -        r = poll(u->pollfd, POLLFD_ALSA_BASE + (PA_SOURCE_OPENED(u->source->thread_info.state) ? u->n_alsa_fds : 0), -1); -/*         pa_log("poll end"); */ - -        pa_asyncmsgq_after_poll(u->thread_mq.inq); - -        if (r < 0) { -            if (errno == EINTR) { -                u->pollfd[POLLFD_ASYNCQ].revents = 0; -                revents = 0; -                continue; -            } - +        if (pa_rtpoll_run(u->rtpoll) < 0) {              pa_log("poll() failed: %s", pa_cstrerror(errno));              goto fail;          } -        pa_assert(r > 0); -          if (PA_SOURCE_OPENED(u->source->thread_info.state)) { -            if ((err = snd_pcm_poll_descriptors_revents(u->pcm_handle, u->pollfd + POLLFD_ALSA_BASE, u->n_alsa_fds, &revents)) < 0) { +            struct pollfd *pollfd; +            unsigned short revents = 0; +            int err; +            unsigned n; + +            pollfd = pa_rtpoll_item_get_pollfd(u->alsa_rtpoll_item, &n); +             +            if ((err = snd_pcm_poll_descriptors_revents(u->pcm_handle, pollfd, n, &revents)) < 0) {                  pa_log("snd_pcm_poll_descriptors_revents() failed: %s", snd_strerror(err));                  goto fail;              } @@ -668,10 +658,7 @@ static void thread_func(void *userdata) {                  goto fail;              }  /*             pa_log("got alsa event"); */ -        } else -            revents = 0; -         -        pa_assert((u->pollfd[POLLFD_ASYNCQ].revents & ~POLLIN) == 0); +        }      }  fail: @@ -687,7 +674,6 @@ finish:  int pa__init(pa_module*m) {      pa_modargs *ma = NULL; -    int ret = -1;      struct userdata *u = NULL;      const char *dev;      pa_sample_spec ss; @@ -703,6 +689,8 @@ int pa__init(pa_module*m) {      int namereg_fail;      int use_mmap = 1, b; +    snd_pcm_info_alloca(&pcm_info); +      pa_assert(m);      if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { @@ -739,9 +727,10 @@ int pa__init(pa_module*m) {      u->module = m;      m->userdata = u;      u->use_mmap = use_mmap; -    u->n_alsa_fds = 0; -    u->pollfd = NULL;      pa_thread_mq_init(&u->thread_mq, m->core->mainloop); +    u->rtpoll = pa_rtpoll_new(); +    u->alsa_rtpoll_item = NULL; +    pa_rtpoll_item_new_asyncmsgq(u->rtpoll, u->thread_mq.inq);      snd_config_update_free_global();      if ((err = snd_pcm_open(&u->pcm_handle, dev = pa_modargs_get_value(ma, "device", DEFAULT_DEVICE), SND_PCM_STREAM_CAPTURE, SND_PCM_NONBLOCK)) < 0) { @@ -751,8 +740,7 @@ int pa__init(pa_module*m) {      u->device_name = pa_xstrdup(dev); -    if ((err = snd_pcm_info_malloc(&pcm_info)) < 0 || -        (err = snd_pcm_info(u->pcm_handle, pcm_info)) < 0) { +    if ((err = snd_pcm_info(u->pcm_handle, pcm_info)) < 0) {          pa_log("Error fetching PCM info: %s", snd_strerror(err));          goto fail;      } @@ -875,26 +863,18 @@ int pa__init(pa_module*m) {      if (u->source->get_mute)          u->source->get_mute(u->source); -    snd_pcm_start(u->pcm_handle); +    pa_modargs_free(ma); -    ret = 0; - -finish: - -    if (ma) -        pa_modargs_free(ma); +    return 0; -    if (pcm_info) -        snd_pcm_info_free(pcm_info); - -    return ret; -  fail: -    if (u) -        pa__done(m); +    if (ma) +        pa_modargs_free(ma); -    goto finish; +    pa__done(m); +     +    return -1;  }  void pa__done(pa_module*m) { @@ -918,6 +898,12 @@ void pa__done(pa_module*m) {      if (u->source)          pa_source_unref(u->source); +    if (u->alsa_rtpoll_item) +        pa_rtpoll_item_free(u->alsa_rtpoll_item); +     +    if (u->rtpoll) +        pa_rtpoll_free(u->rtpoll); +          if (u->mixer_fdl)          pa_alsa_fdlist_free(u->mixer_fdl); @@ -929,7 +915,6 @@ void pa__done(pa_module*m) {          snd_pcm_close(u->pcm_handle);      } -    pa_xfree(u->pollfd);      pa_xfree(u->device_name);      pa_xfree(u); diff --git a/src/modules/module-null-sink.c b/src/modules/module-null-sink.c index 8b17b59e..3b512371 100644 --- a/src/modules/module-null-sink.c +++ b/src/modules/module-null-sink.c @@ -33,7 +33,6 @@  #include <fcntl.h>  #include <unistd.h>  #include <limits.h> -#include <sys/poll.h>  #include <pulse/timeval.h>  #include <pulse/xmalloc.h> @@ -47,6 +46,8 @@  #include <pulsecore/log.h>  #include <pulsecore/thread.h>  #include <pulsecore/thread-mq.h> +#include <pulsecore/rtpoll.h> +#include <pulsecore/rtclock.h>  #include "module-null-sink-symdef.h" @@ -67,11 +68,14 @@ struct userdata {      pa_core *core;      pa_module *module;      pa_sink *sink; +      pa_thread *thread;      pa_thread_mq thread_mq; +    pa_rtpoll *rtpoll; +      size_t block_size; -    struct timeval timestamp; +    struct timespec timestamp;  };  static const char* const valid_modargs[] = { @@ -91,19 +95,19 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse          case PA_SINK_MESSAGE_SET_STATE:              if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING) -                pa_gettimeofday(&u->timestamp); +                pa_rtclock_get(&u->timestamp);              break;          case PA_SINK_MESSAGE_GET_LATENCY: { -            struct timeval now; +            struct timespec now; -            pa_gettimeofday(&now); +            pa_rtclock_get(&now); -            if (pa_timeval_cmp(&u->timestamp, &now) > 0) +            if (pa_timespec_cmp(&u->timestamp, &now) > 0)                  *((pa_usec_t*) data) = 0;              else -                *((pa_usec_t*) data) = pa_timeval_diff(&u->timestamp, &now); +                *((pa_usec_t*) data) = pa_timespec_diff(&u->timestamp, &now);              break;          }      } @@ -113,29 +117,41 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse  static void thread_func(void *userdata) {      struct userdata *u = userdata; -    struct pollfd pollfd;      pa_assert(u);      pa_log_debug("Thread starting up");      pa_thread_mq_install(&u->thread_mq); +    pa_rtpoll_install(u->rtpoll); -    pa_gettimeofday(&u->timestamp); - -    memset(&pollfd, 0, sizeof(pollfd)); -    pollfd.fd = pa_asyncmsgq_get_fd(u->thread_mq.inq); -    pollfd.events = POLLIN; +    pa_rtclock_get(&u->timestamp);      for (;;) {          pa_msgobject *object;          int code;          void *data;          pa_memchunk chunk; -        int r, timeout; -        struct timeval now;          int64_t offset; +        /* Render some data and drop it immediately */ +        if (u->sink->thread_info.state == PA_SINK_RUNNING) { +            struct timespec now; +             +            pa_rtclock_get(&now); + +            if (pa_timespec_cmp(&u->timestamp, &now) <= 0) { + +                pa_sink_render(u->sink, u->block_size, &chunk); +                pa_memblock_unref(chunk.memblock); + +                pa_timespec_add(&u->timestamp, pa_bytes_to_usec(chunk.length, &u->sink->sample_spec)); +            } + +            pa_rtpoll_set_timer_absolute(u->rtpoll, &u->timestamp); +        } else +            pa_rtpoll_set_timer_disabled(u->rtpoll); +          /* Check whether there is a message for us to process */          if (pa_asyncmsgq_get(u->thread_mq.inq, &object, &code, &data, &offset, &chunk, 0) == 0) {              int ret; @@ -150,45 +166,11 @@ static void thread_func(void *userdata) {              continue;          } -        /* Render some data and drop it immediately */ -        if (u->sink->thread_info.state == PA_SINK_RUNNING) { -            pa_gettimeofday(&now); - -            if (pa_timeval_cmp(&u->timestamp, &now) <= 0) { - -                pa_sink_render(u->sink, u->block_size, &chunk); -                pa_memblock_unref(chunk.memblock); - -                pa_timeval_add(&u->timestamp, pa_bytes_to_usec(chunk.length, &u->sink->sample_spec)); -                continue; -            } - -            timeout = pa_timeval_diff(&u->timestamp, &now)/1000; - -            if (timeout < 1) -                timeout = 1; -        } else -            timeout = -1; -          /* Hmm, nothing to do. Let's sleep */ - -        if (pa_asyncmsgq_before_poll(u->thread_mq.inq) < 0) -            continue; - -        r = poll(&pollfd, 1, timeout); -        pa_asyncmsgq_after_poll(u->thread_mq.inq); - -        if (r < 0) { -            if (errno == EINTR) { -                pollfd.revents = 0; -                continue; -            } - +        if (pa_rtpoll_run(u->rtpoll) < 0) {              pa_log("poll() failed: %s", pa_cstrerror(errno));              goto fail;          } - -        pa_assert(r == 0 || pollfd.revents == POLLIN);      }  fail: @@ -224,8 +206,9 @@ int pa__init(pa_module*m) {      u->core = m->core;      u->module = m;      m->userdata = u; -      pa_thread_mq_init(&u->thread_mq, m->core->mainloop); +    u->rtpoll = pa_rtpoll_new(); +    pa_rtpoll_item_new_asyncmsgq(u->rtpoll, u->thread_mq.inq);      if (!(u->sink = pa_sink_new(m->core, __FILE__, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME), 0, &ss, &map))) {          pa_log("Failed to create sink."); @@ -282,5 +265,8 @@ void pa__done(pa_module*m) {      if (u->sink)          pa_sink_unref(u->sink); +    if (u->rtpoll) +        pa_rtpoll_free(u->rtpoll); +          pa_xfree(u);  } diff --git a/src/modules/module-oss.c b/src/modules/module-oss.c index f4454813..85fdd9c8 100644 --- a/src/modules/module-oss.c +++ b/src/modules/module-oss.c @@ -32,9 +32,7 @@   *   the device. If none is avilable from the inputs, we write silence   *   instead.   * - *   If power should be saved on IDLE this should be implemented in a - *   special suspend-on-idle module that will put us into SUSPEND mode - *   as soon and we're idle for too long. + *   If power should be saved on IDLE module-suspend-on-idle should be used.   *   */ @@ -48,12 +46,6 @@  #include <sys/mman.h>  #endif -#ifdef HAVE_SYS_POLL_H -#include <sys/poll.h> -#else -#include "poll.h" -#endif -  #include <sys/soundcard.h>  #include <sys/ioctl.h>  #include <stdlib.h> @@ -80,6 +72,7 @@  #include <pulsecore/log.h>  #include <pulsecore/macro.h>  #include <pulsecore/thread-mq.h> +#include <pulsecore/rtpoll.h>  #include "oss-util.h"  #include "module-oss-symdef.h" @@ -108,8 +101,10 @@ struct userdata {      pa_module *module;      pa_sink *sink;      pa_source *source; +          pa_thread *thread;      pa_thread_mq thread_mq; +    pa_rtpoll *rtpoll;      char *device_name; @@ -135,6 +130,8 @@ struct userdata {      pa_memblock **in_mmap_memblocks, **out_mmap_memblocks;      int in_mmap_saved_nfrags, out_mmap_saved_nfrags; + +    pa_rtpoll_item *rtpoll_item;  };  static const char* const valid_modargs[] = { @@ -156,10 +153,12 @@ static const char* const valid_modargs[] = {  static void trigger(struct userdata *u, int quick) {      int enable_bits = 0, zero = 0; +    pa_assert(u); +          if (u->fd < 0)          return; -    pa_log_debug("trigger");  +/*     pa_log_debug("trigger");  */      if (u->source && PA_SOURCE_OPENED(u->source->thread_info.state))          enable_bits |= PCM_ENABLE_INPUT; @@ -479,6 +478,11 @@ static int suspend(struct userdata *u) {      close(u->fd);      u->fd = -1; +    if (u->rtpoll_item) { +        pa_rtpoll_item_free(u->rtpoll_item); +        u->rtpoll_item = NULL; +    } +          pa_log_debug("Device suspended...");      return 0; @@ -490,6 +494,7 @@ static int unsuspend(struct userdata *u) {      int frag_size, in_frag_size, out_frag_size;      int in_nfrags, out_nfrags;      struct audio_buf_info info; +    struct pollfd *pollfd;      pa_assert(u);      pa_assert(u->fd < 0); @@ -568,7 +573,15 @@ static int unsuspend(struct userdata *u) {      u->out_mmap_current = u->in_mmap_current = 0;      u->out_mmap_saved_nfrags = u->in_mmap_saved_nfrags = 0; + +    pa_assert(!u->rtpoll_item); +    u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, 1); +    pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); +    pollfd->fd = u->fd; +    pollfd->events = 0; +    pollfd->revents = 0; +      pa_log_debug("Resumed successfully...");      return 0; @@ -777,15 +790,9 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off  }  static void thread_func(void *userdata) { -    enum { -        POLLFD_ASYNCQ, -        POLLFD_DSP, -        POLLFD_MAX, -    }; -      struct userdata *u = userdata; -    struct pollfd pollfd[POLLFD_MAX];      int write_type = 0, read_type = 0; +    unsigned short revents = 0;      pa_assert(u); @@ -795,46 +802,22 @@ static void thread_func(void *userdata) {          pa_make_realtime();      pa_thread_mq_install(&u->thread_mq); +    pa_rtpoll_install(u->rtpoll);      trigger(u, 0); -    memset(&pollfd, 0, sizeof(pollfd)); - -    pollfd[POLLFD_ASYNCQ].fd = pa_asyncmsgq_get_fd(u->thread_mq.inq); -    pollfd[POLLFD_ASYNCQ].events = POLLIN; -    pollfd[POLLFD_DSP].fd = u->fd; -      for (;;) {          pa_msgobject *object;          int code;          void *data;          pa_memchunk chunk; -        int r;          int64_t offset;  /*        pa_log("loop");    */ -        /* Check whether there is a message for us to process */ -        if (pa_asyncmsgq_get(u->thread_mq.inq, &object, &code, &data, &offset, &chunk, 0) == 0) { -            int ret; - -/*             pa_log("processing msg"); */ - -            if (!object && code == PA_MESSAGE_SHUTDOWN) { -                pa_asyncmsgq_done(u->thread_mq.inq, 0); -                goto finish; -            } - -            ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); -            pa_asyncmsgq_done(u->thread_mq.inq, ret); -            continue; -        }  - -/*         pa_log("loop2"); */ -          /* Render some data and write it to the dsp */ -        if (u->sink && u->sink->thread_info.state != PA_SINK_DISCONNECTED && u->fd >= 0 && (pollfd[POLLFD_DSP].revents & POLLOUT)) { +        if (u->sink && u->sink->thread_info.state != PA_SINK_DISCONNECTED && u->fd >= 0 && (revents & POLLOUT)) {              if (u->use_mmap) {                  int ret; @@ -842,7 +825,7 @@ static void thread_func(void *userdata) {                  if ((ret = mmap_write(u)) < 0)                      goto fail; -                pollfd[POLLFD_DSP].revents &= ~POLLOUT; +                revents &= ~POLLOUT;                  if (ret > 0)                      continue; @@ -894,7 +877,7 @@ static void thread_func(void *userdata) {                          else if (errno == EAGAIN) {                              pa_log_debug("EAGAIN");  -                            pollfd[POLLFD_DSP].revents &= ~POLLOUT; +                            revents &= ~POLLOUT;                              break;                          } else { @@ -914,7 +897,7 @@ static void thread_func(void *userdata) {                          l -= t; -                        pollfd[POLLFD_DSP].revents &= ~POLLOUT; +                        revents &= ~POLLOUT;                      }                  } while (loop && l > 0); @@ -925,7 +908,7 @@ static void thread_func(void *userdata) {          /* Try to read some data and pass it on to the source driver */ -        if (u->source && u->source->thread_info.state != PA_SOURCE_DISCONNECTED && u->fd >= 0 && ((pollfd[POLLFD_DSP].revents & POLLIN))) { +        if (u->source && u->source->thread_info.state != PA_SOURCE_DISCONNECTED && u->fd >= 0 && ((revents & POLLIN))) {              if (u->use_mmap) {                  int ret; @@ -933,7 +916,7 @@ static void thread_func(void *userdata) {                  if ((ret = mmap_read(u)) < 0)                      goto fail; -                pollfd[POLLFD_DSP].revents &= ~POLLIN; +                revents &= ~POLLIN;                  if (ret > 0)                      continue; @@ -985,7 +968,7 @@ static void thread_func(void *userdata) {                          else if (errno == EAGAIN) {                              pa_log_debug("EAGAIN");  -                            pollfd[POLLFD_DSP].revents &= ~POLLIN; +                            revents &= ~POLLIN;                              break;                          } else { @@ -1002,7 +985,7 @@ static void thread_func(void *userdata) {                          l -= t; -                        pollfd[POLLFD_DSP].revents &= ~POLLIN; +                        revents &= ~POLLIN;                      }                  } while (loop && l > 0); @@ -1010,46 +993,53 @@ static void thread_func(void *userdata) {              }          } -        if (u->fd >= 0) { -            pollfd[POLLFD_DSP].fd = u->fd; -            pollfd[POLLFD_DSP].events = -                ((u->source && PA_SOURCE_OPENED(u->source->thread_info.state)) ? POLLIN : 0) | -                ((u->sink && PA_SINK_OPENED(u->sink->thread_info.state)) ? POLLOUT : 0); -        } -             -        /* Hmm, nothing to do. Let's sleep */ +/*         pa_log("loop2"); */ -        if (pa_asyncmsgq_before_poll(u->thread_mq.inq) < 0) +        /* Check whether there is a message for us to process */ +        if (pa_asyncmsgq_get(u->thread_mq.inq, &object, &code, &data, &offset, &chunk, 0) == 0) { +            int ret; + +/*             pa_log("processing msg"); */ + +            if (!object && code == PA_MESSAGE_SHUTDOWN) { +                pa_asyncmsgq_done(u->thread_mq.inq, 0); +                goto finish; +            } + +            ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); +            pa_asyncmsgq_done(u->thread_mq.inq, ret);              continue; +        }  -/*         pa_log("polling for %i (legend: %i=POLLIN, %i=POLLOUT)", u->fd >= 0 ? pollfd[POLLFD_DSP].events : -1, POLLIN, POLLOUT); */ -        r = poll(pollfd, u->fd >= 0 ? POLLFD_MAX : POLLFD_DSP, -1); -/*         pa_log("polling got dsp=%i amq=%i (%i)", r > 0 ? pollfd[POLLFD_DSP].revents : 0, r > 0 ? pollfd[POLLFD_ASYNCQ].revents : 0, r); */ +        if (u->fd >= 0) { +            struct pollfd *pollfd; -        pa_asyncmsgq_after_poll(u->thread_mq.inq); +            pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); +            pollfd->events = +                ((u->source && PA_SOURCE_OPENED(u->source->thread_info.state)) ? POLLIN : 0) | +                ((u->sink && PA_SINK_OPENED(u->sink->thread_info.state)) ? POLLOUT : 0); +        } -        if (u->fd < 0) -            pollfd[POLLFD_DSP].revents = 0; -        if (r < 0) { -            if (errno == EINTR) { -                pollfd[POLLFD_ASYNCQ].revents = 0; -                pollfd[POLLFD_DSP].revents = 0; -                continue; -            } - +        /* Hmm, nothing to do. Let's sleep */ +        if (pa_rtpoll_run(u->rtpoll) < 0) {              pa_log("poll() failed: %s", pa_cstrerror(errno));              goto fail;          } -        pa_assert(r > 0); - -        if (pollfd[POLLFD_DSP].revents & ~(POLLOUT|POLLIN)) { -            pa_log("DSP shutdown."); -            goto fail; -        } +        if (u->fd >= 0) { +            struct pollfd *pollfd; +             +            pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); +             +            if (pollfd->revents & ~(POLLOUT|POLLIN)) { +                pa_log("DSP shutdown."); +                goto fail; +            } -        pa_assert((pollfd[POLLFD_ASYNCQ].revents & ~POLLIN) == 0); +            revents = pollfd->revents; +        } else +            revents = 0;      }  fail: @@ -1077,6 +1067,7 @@ int pa__init(pa_module*m) {      char hwdesc[64], *t;      const char *name;      int namereg_fail; +    struct pollfd *pollfd;      pa_assert(m); @@ -1165,7 +1156,14 @@ int pa__init(pa_module*m) {      u->out_fragment_size = u->in_fragment_size = u->frag_size = frag_size;      u->use_mmap = use_mmap;      pa_thread_mq_init(&u->thread_mq, m->core->mainloop); - +    u->rtpoll = pa_rtpoll_new(); +    pa_rtpoll_item_new_asyncmsgq(u->rtpoll, u->thread_mq.inq); +    u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, 1); +    pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); +    pollfd->fd = fd; +    pollfd->events = 0; +    pollfd->revents = 0; +          if (ioctl(fd, SNDCTL_DSP_GETISPACE, &info) >= 0) {          pa_log_info("Input -- %u fragments of size %u.", info.fragstotal, info.fragsize);          u->in_fragment_size = info.fragsize; @@ -1294,14 +1292,14 @@ go_on:          goto fail;      } -    pa_modargs_free(ma); -      /* Read mixer settings */      if (u->source)          pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->source), PA_SOURCE_MESSAGE_GET_VOLUME, &u->source->volume, 0, NULL, NULL);      if (u->sink)          pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_GET_VOLUME, &u->sink->volume, 0, NULL, NULL); +    pa_modargs_free(ma); +      return 0;  fail: @@ -1343,10 +1341,16 @@ void pa__done(pa_module*m) {      if (u->source)          pa_source_unref(u->source); - +          if (u->memchunk.memblock)          pa_memblock_unref(u->memchunk.memblock); +    if (u->rtpoll_item) +        pa_rtpoll_item_free(u->rtpoll_item); +     +    if (u->rtpoll) +        pa_rtpoll_free(u->rtpoll); +          if (u->out_mmap_memblocks) {          unsigned i;          for (i = 0; i < u->out_nfrags; i++) diff --git a/src/modules/module-pipe-sink.c b/src/modules/module-pipe-sink.c index 2f82cae8..a1101ab8 100644 --- a/src/modules/module-pipe-sink.c +++ b/src/modules/module-pipe-sink.c @@ -34,7 +34,6 @@  #include <unistd.h>  #include <limits.h>  #include <sys/ioctl.h> -#include <sys/poll.h>  #include <pulse/xmalloc.h> @@ -46,6 +45,7 @@  #include <pulsecore/log.h>  #include <pulsecore/thread.h>  #include <pulsecore/thread-mq.h> +#include <pulsecore/rtpoll.h>  #include "module-pipe-sink-symdef.h" @@ -67,12 +67,17 @@ struct userdata {      pa_core *core;      pa_module *module;      pa_sink *sink; +          pa_thread *thread;      pa_thread_mq thread_mq; +    pa_rtpoll *rtpoll; +          char *filename;      int fd;      pa_memchunk memchunk; + +    pa_rtpoll_item *rtpoll_item;  };  static const char* const valid_modargs[] = { @@ -108,14 +113,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse  }  static void thread_func(void *userdata) { -    enum { -        POLLFD_ASYNCQ, -        POLLFD_FIFO, -        POLLFD_MAX, -    }; -          struct userdata *u = userdata; -    struct pollfd pollfd[POLLFD_MAX];      int write_type = 0;      pa_assert(u); @@ -123,38 +121,21 @@ static void thread_func(void *userdata) {      pa_log_debug("Thread starting up");      pa_thread_mq_install(&u->thread_mq); - -    memset(&pollfd, 0, sizeof(pollfd)); -     -    pollfd[POLLFD_ASYNCQ].fd = pa_asyncmsgq_get_fd(u->thread_mq.inq); -    pollfd[POLLFD_ASYNCQ].events = POLLIN; -    pollfd[POLLFD_FIFO].fd = u->fd; +    pa_rtpoll_install(u->rtpoll);      for (;;) {          pa_msgobject *object;          int code;          void *data;          pa_memchunk chunk; -        int r;          int64_t offset; - -        /* Check whether there is a message for us to process */ -        if (pa_asyncmsgq_get(u->thread_mq.inq, &object, &code, &data, &offset, &chunk, 0) == 0) { -            int ret; - -            if (!object && code == PA_MESSAGE_SHUTDOWN) { -                pa_asyncmsgq_done(u->thread_mq.inq, 0); -                goto finish; -            } - -            ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); -            pa_asyncmsgq_done(u->thread_mq.inq, ret); -            continue; -        } +        struct pollfd *pollfd;          /* Render some data and write it to the fifo */ -        if (u->sink->thread_info.state == PA_SINK_RUNNING && pollfd[POLLFD_FIFO].revents) { +        pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); +         +        if (u->sink->thread_info.state == PA_SINK_RUNNING && pollfd->revents) {              ssize_t l;              void *p; @@ -188,41 +169,37 @@ static void thread_func(void *userdata) {                      pa_memchunk_reset(&u->memchunk);                  } -                pollfd[POLLFD_FIFO].revents = 0; -                continue; +                pollfd->revents = 0;              }          } -        pollfd[POLLFD_FIFO].events = u->sink->thread_info.state == PA_SINK_RUNNING ? POLLOUT : 0; +        /* Check whether there is a message for us to process */ +        if (pa_asyncmsgq_get(u->thread_mq.inq, &object, &code, &data, &offset, &chunk, 0) == 0) { +            int ret; -        /* Hmm, nothing to do. Let's sleep */ +            if (!object && code == PA_MESSAGE_SHUTDOWN) { +                pa_asyncmsgq_done(u->thread_mq.inq, 0); +                goto finish; +            } -        if (pa_asyncmsgq_before_poll(u->thread_mq.inq) < 0) +            ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); +            pa_asyncmsgq_done(u->thread_mq.inq, ret);              continue; +        } +         +        /* Hmm, nothing to do. Let's sleep */ +        pollfd->events = u->sink->thread_info.state == PA_SINK_RUNNING ? POLLOUT : 0; -/*         pa_log("polling for %u", pollfd[POLLFD_FIFO].events);  */ -        r = poll(pollfd, POLLFD_MAX, -1); -/*         pa_log("polling got %u", r > 0 ? pollfd[POLLFD_FIFO].revents : 0);  */ - -        pa_asyncmsgq_after_poll(u->thread_mq.inq); - -        if (r < 0) { -            if (errno == EINTR) { -                pollfd[POLLFD_ASYNCQ].revents = 0; -                pollfd[POLLFD_FIFO].revents = 0; -                continue; -            } - +        if (pa_rtpoll_run(u->rtpoll) < 0) {              pa_log("poll() failed: %s", pa_cstrerror(errno));              goto fail;          } -        if (pollfd[POLLFD_FIFO].revents & ~POLLOUT) { +        pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); +        if (pollfd->revents & ~POLLOUT) {              pa_log("FIFO shutdown.");              goto fail;          } - -        pa_assert((pollfd[POLLFD_ASYNCQ].revents & ~POLLIN) == 0);      }  fail: @@ -242,6 +219,7 @@ int pa__init(pa_module*m) {      pa_channel_map map;      pa_modargs *ma;      char *t; +    struct pollfd *pollfd;      pa_assert(m); @@ -262,6 +240,8 @@ int pa__init(pa_module*m) {      m->userdata = u;      pa_memchunk_reset(&u->memchunk);      pa_thread_mq_init(&u->thread_mq, m->core->mainloop); +    u->rtpoll = pa_rtpoll_new(); +    pa_rtpoll_item_new_asyncmsgq(u->rtpoll, u->thread_mq.inq);      u->filename = pa_xstrdup(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME)); @@ -297,6 +277,11 @@ int pa__init(pa_module*m) {      pa_sink_set_description(u->sink, t = pa_sprintf_malloc("Unix FIFO sink '%s'", u->filename));      pa_xfree(t); +    u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, 1); +    pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); +    pollfd->fd = u->fd; +    pollfd->events = pollfd->revents = 0; +         if (!(u->thread = pa_thread_new(thread_func, u))) {          pa_log("Failed to create thread.");          goto fail; @@ -339,6 +324,12 @@ void pa__done(pa_module*m) {      if (u->memchunk.memblock)         pa_memblock_unref(u->memchunk.memblock); +    if (u->rtpoll_item) +        pa_rtpoll_item_free(u->rtpoll_item); +     +    if (u->rtpoll) +        pa_rtpoll_free(u->rtpoll); +      if (u->filename) {          unlink(u->filename);          pa_xfree(u->filename);  | 
