diff options
author | Lennart Poettering <lennart@poettering.net> | 2008-04-23 18:26:48 +0000 |
---|---|---|
committer | Lennart Poettering <lennart@poettering.net> | 2008-04-23 18:26:48 +0000 |
commit | 76031df4a4a156b7a6a9723b108bfdb37521ef7c (patch) | |
tree | 6c52c49bca198ab175b88fe67b9e84658bcd5e4e /src/pulsecore/asyncq.c | |
parent | a197644ea2cac5e35f2ca6d3d2af149ebedc13ba (diff) |
Big pile of interdependant changes:
* Fix a deadlock when an asyncq overflows and an RT thread needed to wait until space became available again while the main thread was waiting for a operation to complete and thus didn't free any new items. Now, if the asyncq overflows, queue those items temporarily, and return immediately. Then, when the queue becomes writable again, flush it.
* Modify pa_thread_mq_init() to also set up pa_rtpoll events properly for the MQ
* Some more pa_bool_t'ization
* Unify more common code between alsa-sink and alsa-source
* The upper limit for the tsched watermark is max_use minus one frame
* make module-alsa-source work
* make the alsa modules use pa_alsa_build_pollfd() now
* fix detection of dB scale for alsa-source
git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/glitch-free@2308 fefdeb5f-60dc-0310-8127-8f9354f1896f
Diffstat (limited to 'src/pulsecore/asyncq.c')
-rw-r--r-- | src/pulsecore/asyncq.c | 124 |
1 files changed, 119 insertions, 5 deletions
diff --git a/src/pulsecore/asyncq.c b/src/pulsecore/asyncq.c index 75b15c0e..34506e49 100644 --- a/src/pulsecore/asyncq.c +++ b/src/pulsecore/asyncq.c @@ -33,6 +33,8 @@ #include <pulsecore/thread.h> #include <pulsecore/macro.h> #include <pulsecore/core-util.h> +#include <pulsecore/llist.h> +#include <pulsecore/flist.h> #include <pulse/xmalloc.h> #include "asyncq.h" @@ -51,13 +53,24 @@ #define _Y do { } while(0) #endif +struct localq { + void *data; + PA_LLIST_FIELDS(struct localq); +}; + struct pa_asyncq { unsigned size; unsigned read_idx; unsigned write_idx; pa_fdsem *read_fdsem, *write_fdsem; + + PA_LLIST_HEAD(struct localq, localq); + struct localq *last_localq; + pa_bool_t waiting_for_post; }; +PA_STATIC_FLIST_DECLARE(localq, 0, pa_xfree); + #define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq)))) static int is_power_of_two(unsigned size) { @@ -80,6 +93,10 @@ pa_asyncq *pa_asyncq_new(unsigned size) { l->size = size; + PA_LLIST_HEAD_INIT(struct localq, l->localq); + l->last_localq = NULL; + l->waiting_for_post = FALSE; + if (!(l->read_fdsem = pa_fdsem_new())) { pa_xfree(l); return NULL; @@ -95,6 +112,7 @@ pa_asyncq *pa_asyncq_new(unsigned size) { } void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) { + struct localq *q; pa_assert(l); if (free_cb) { @@ -104,12 +122,22 @@ void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) { free_cb(p); } + while ((q = l->localq)) { + if (free_cb) + free_cb(q->data); + + PA_LLIST_REMOVE(struct localq, l->localq, q); + + if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0) + pa_xfree(q); + } + pa_fdsem_free(l->read_fdsem); pa_fdsem_free(l->write_fdsem); pa_xfree(l); } -int pa_asyncq_push(pa_asyncq*l, void *p, int wait) { +static int push(pa_asyncq*l, void *p, pa_bool_t wait) { int idx; pa_atomic_ptr_t *cells; @@ -141,7 +169,63 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) { return 0; } -void* pa_asyncq_pop(pa_asyncq*l, int wait) { +static pa_bool_t flush_postq(pa_asyncq *l) { + struct localq *q; + + pa_assert(l); + + while ((q = l->last_localq)) { + + if (push(l, q->data, FALSE) < 0) + return FALSE; + + l->last_localq = q->prev; + + PA_LLIST_REMOVE(struct localq, l->localq, q); + + if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0) + pa_xfree(q); + } + + return TRUE; +} + +int pa_asyncq_push(pa_asyncq*l, void *p, pa_bool_t wait) { + pa_assert(l); + + if (!flush_postq(l)) + return -1; + + return push(l, p, wait); +} + +void pa_asyncq_post(pa_asyncq*l, void *p) { + struct localq *q; + + pa_assert(l); + pa_assert(p); + + if (pa_asyncq_push(l, p, FALSE) >= 0) + return; + + /* OK, we couldn't push anything in the queue. So let's queue it + * locally and push it later */ + + pa_log("q overrun, queuing locally"); + + if (!(q = pa_flist_pop(PA_STATIC_FLIST_GET(localq)))) + q = pa_xnew(struct localq, 1); + + q->data = p; + PA_LLIST_PREPEND(struct localq, l->localq, q); + + if (!l->last_localq) + l->last_localq = q; + + return; +} + +void* pa_asyncq_pop(pa_asyncq*l, pa_bool_t wait) { int idx; void *ret; pa_atomic_ptr_t *cells; @@ -178,13 +262,13 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) { return ret; } -int pa_asyncq_get_fd(pa_asyncq *q) { +int pa_asyncq_read_fd(pa_asyncq *q) { pa_assert(q); return pa_fdsem_get(q->write_fdsem); } -int pa_asyncq_before_poll(pa_asyncq *l) { +int pa_asyncq_read_before_poll(pa_asyncq *l) { int idx; pa_atomic_ptr_t *cells; @@ -206,8 +290,38 @@ int pa_asyncq_before_poll(pa_asyncq *l) { return 0; } -void pa_asyncq_after_poll(pa_asyncq *l) { +void pa_asyncq_read_after_poll(pa_asyncq *l) { pa_assert(l); pa_fdsem_after_poll(l->write_fdsem); } + +int pa_asyncq_write_fd(pa_asyncq *q) { + pa_assert(q); + + return pa_fdsem_get(q->read_fdsem); +} + +void pa_asyncq_write_before_poll(pa_asyncq *l) { + pa_assert(l); + + for (;;) { + + if (flush_postq(l)) + break; + + if (pa_fdsem_before_poll(l->read_fdsem) >= 0) { + l->waiting_for_post = TRUE; + break; + } + } +} + +void pa_asyncq_write_after_poll(pa_asyncq *l) { + pa_assert(l); + + if (l->waiting_for_post) { + pa_fdsem_after_poll(l->read_fdsem); + l->waiting_for_post = FALSE; + } +} |