summaryrefslogtreecommitdiffstats
path: root/src/pulsecore
diff options
context:
space:
mode:
authorLennart Poettering <lennart@poettering.net>2008-04-23 18:26:48 +0000
committerLennart Poettering <lennart@poettering.net>2008-04-23 18:26:48 +0000
commit76031df4a4a156b7a6a9723b108bfdb37521ef7c (patch)
tree6c52c49bca198ab175b88fe67b9e84658bcd5e4e /src/pulsecore
parenta197644ea2cac5e35f2ca6d3d2af149ebedc13ba (diff)
Big pile of interdependant changes:
* Fix a deadlock when an asyncq overflows and an RT thread needed to wait until space became available again while the main thread was waiting for a operation to complete and thus didn't free any new items. Now, if the asyncq overflows, queue those items temporarily, and return immediately. Then, when the queue becomes writable again, flush it. * Modify pa_thread_mq_init() to also set up pa_rtpoll events properly for the MQ * Some more pa_bool_t'ization * Unify more common code between alsa-sink and alsa-source * The upper limit for the tsched watermark is max_use minus one frame * make module-alsa-source work * make the alsa modules use pa_alsa_build_pollfd() now * fix detection of dB scale for alsa-source git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/glitch-free@2308 fefdeb5f-60dc-0310-8127-8f9354f1896f
Diffstat (limited to 'src/pulsecore')
-rw-r--r--src/pulsecore/asyncmsgq.c36
-rw-r--r--src/pulsecore/asyncmsgq.h15
-rw-r--r--src/pulsecore/asyncq.c124
-rw-r--r--src/pulsecore/asyncq.h21
-rw-r--r--src/pulsecore/rtpoll.c55
-rw-r--r--src/pulsecore/rtpoll.h3
-rw-r--r--src/pulsecore/thread-mq.c35
-rw-r--r--src/pulsecore/thread-mq.h5
8 files changed, 248 insertions, 46 deletions
diff --git a/src/pulsecore/asyncmsgq.c b/src/pulsecore/asyncmsgq.c
index 96b43a71..eba1c2cb 100644
--- a/src/pulsecore/asyncmsgq.c
+++ b/src/pulsecore/asyncmsgq.c
@@ -136,7 +136,7 @@ void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const vo
/* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
pa_mutex_lock(a->mutex);
- pa_assert_se(pa_asyncq_push(a->asyncq, i, 1) == 0);
+ pa_asyncq_post(a->asyncq, i);
pa_mutex_unlock(a->mutex);
}
@@ -163,7 +163,7 @@ int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const voi
/* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
pa_mutex_lock(a->mutex);
- pa_assert_se(pa_asyncq_push(a->asyncq, &i, 1) == 0);
+ pa_assert_se(pa_asyncq_push(a->asyncq, &i, TRUE) == 0);
pa_mutex_unlock(a->mutex);
pa_semaphore_wait(i.semaphore);
@@ -174,7 +174,7 @@ int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const voi
return i.ret;
}
-int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, int wait) {
+int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, pa_bool_t wait) {
pa_assert(PA_REFCNT_VALUE(a) > 0);
pa_assert(!a->current);
@@ -276,22 +276,40 @@ int pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
return 1;
}
-int pa_asyncmsgq_get_fd(pa_asyncmsgq *a) {
+int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) {
pa_assert(PA_REFCNT_VALUE(a) > 0);
- return pa_asyncq_get_fd(a->asyncq);
+ return pa_asyncq_read_fd(a->asyncq);
}
-int pa_asyncmsgq_before_poll(pa_asyncmsgq *a) {
+int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) {
pa_assert(PA_REFCNT_VALUE(a) > 0);
- return pa_asyncq_before_poll(a->asyncq);
+ return pa_asyncq_read_before_poll(a->asyncq);
}
-void pa_asyncmsgq_after_poll(pa_asyncmsgq *a) {
+void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) {
pa_assert(PA_REFCNT_VALUE(a) > 0);
- pa_asyncq_after_poll(a->asyncq);
+ pa_asyncq_read_after_poll(a->asyncq);
+}
+
+int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) {
+ pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+ return pa_asyncq_write_fd(a->asyncq);
+}
+
+void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) {
+ pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+ pa_asyncq_write_before_poll(a->asyncq);
+}
+
+void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) {
+ pa_assert(PA_REFCNT_VALUE(a) > 0);
+
+ pa_asyncq_write_after_poll(a->asyncq);
}
int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
diff --git a/src/pulsecore/asyncmsgq.h b/src/pulsecore/asyncmsgq.h
index 575f760f..93f1ce86 100644
--- a/src/pulsecore/asyncmsgq.h
+++ b/src/pulsecore/asyncmsgq.h
@@ -62,15 +62,20 @@ void pa_asyncmsgq_unref(pa_asyncmsgq* q);
void pa_asyncmsgq_post(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *memchunk, pa_free_cb_t userdata_free_cb);
int pa_asyncmsgq_send(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *memchunk);
-int pa_asyncmsgq_get(pa_asyncmsgq *q, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *memchunk, int wait);
+int pa_asyncmsgq_get(pa_asyncmsgq *q, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *memchunk, pa_bool_t wait);
int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk);
void pa_asyncmsgq_done(pa_asyncmsgq *q, int ret);
int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code);
int pa_asyncmsgq_process_one(pa_asyncmsgq *a);
-/* Just for the reading side */
-int pa_asyncmsgq_get_fd(pa_asyncmsgq *q);
-int pa_asyncmsgq_before_poll(pa_asyncmsgq *a);
-void pa_asyncmsgq_after_poll(pa_asyncmsgq *a);
+/* For the reading side */
+int pa_asyncmsgq_read_fd(pa_asyncmsgq *q);
+int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a);
+void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a);
+
+/* For the write side */
+int pa_asyncmsgq_write_fd(pa_asyncmsgq *q);
+void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a);
+void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a);
#endif
diff --git a/src/pulsecore/asyncq.c b/src/pulsecore/asyncq.c
index 75b15c0e..34506e49 100644
--- a/src/pulsecore/asyncq.c
+++ b/src/pulsecore/asyncq.c
@@ -33,6 +33,8 @@
#include <pulsecore/thread.h>
#include <pulsecore/macro.h>
#include <pulsecore/core-util.h>
+#include <pulsecore/llist.h>
+#include <pulsecore/flist.h>
#include <pulse/xmalloc.h>
#include "asyncq.h"
@@ -51,13 +53,24 @@
#define _Y do { } while(0)
#endif
+struct localq {
+ void *data;
+ PA_LLIST_FIELDS(struct localq);
+};
+
struct pa_asyncq {
unsigned size;
unsigned read_idx;
unsigned write_idx;
pa_fdsem *read_fdsem, *write_fdsem;
+
+ PA_LLIST_HEAD(struct localq, localq);
+ struct localq *last_localq;
+ pa_bool_t waiting_for_post;
};
+PA_STATIC_FLIST_DECLARE(localq, 0, pa_xfree);
+
#define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq))))
static int is_power_of_two(unsigned size) {
@@ -80,6 +93,10 @@ pa_asyncq *pa_asyncq_new(unsigned size) {
l->size = size;
+ PA_LLIST_HEAD_INIT(struct localq, l->localq);
+ l->last_localq = NULL;
+ l->waiting_for_post = FALSE;
+
if (!(l->read_fdsem = pa_fdsem_new())) {
pa_xfree(l);
return NULL;
@@ -95,6 +112,7 @@ pa_asyncq *pa_asyncq_new(unsigned size) {
}
void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) {
+ struct localq *q;
pa_assert(l);
if (free_cb) {
@@ -104,12 +122,22 @@ void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) {
free_cb(p);
}
+ while ((q = l->localq)) {
+ if (free_cb)
+ free_cb(q->data);
+
+ PA_LLIST_REMOVE(struct localq, l->localq, q);
+
+ if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0)
+ pa_xfree(q);
+ }
+
pa_fdsem_free(l->read_fdsem);
pa_fdsem_free(l->write_fdsem);
pa_xfree(l);
}
-int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
+static int push(pa_asyncq*l, void *p, pa_bool_t wait) {
int idx;
pa_atomic_ptr_t *cells;
@@ -141,7 +169,63 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) {
return 0;
}
-void* pa_asyncq_pop(pa_asyncq*l, int wait) {
+static pa_bool_t flush_postq(pa_asyncq *l) {
+ struct localq *q;
+
+ pa_assert(l);
+
+ while ((q = l->last_localq)) {
+
+ if (push(l, q->data, FALSE) < 0)
+ return FALSE;
+
+ l->last_localq = q->prev;
+
+ PA_LLIST_REMOVE(struct localq, l->localq, q);
+
+ if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0)
+ pa_xfree(q);
+ }
+
+ return TRUE;
+}
+
+int pa_asyncq_push(pa_asyncq*l, void *p, pa_bool_t wait) {
+ pa_assert(l);
+
+ if (!flush_postq(l))
+ return -1;
+
+ return push(l, p, wait);
+}
+
+void pa_asyncq_post(pa_asyncq*l, void *p) {
+ struct localq *q;
+
+ pa_assert(l);
+ pa_assert(p);
+
+ if (pa_asyncq_push(l, p, FALSE) >= 0)
+ return;
+
+ /* OK, we couldn't push anything in the queue. So let's queue it
+ * locally and push it later */
+
+ pa_log("q overrun, queuing locally");
+
+ if (!(q = pa_flist_pop(PA_STATIC_FLIST_GET(localq))))
+ q = pa_xnew(struct localq, 1);
+
+ q->data = p;
+ PA_LLIST_PREPEND(struct localq, l->localq, q);
+
+ if (!l->last_localq)
+ l->last_localq = q;
+
+ return;
+}
+
+void* pa_asyncq_pop(pa_asyncq*l, pa_bool_t wait) {
int idx;
void *ret;
pa_atomic_ptr_t *cells;
@@ -178,13 +262,13 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) {
return ret;
}
-int pa_asyncq_get_fd(pa_asyncq *q) {
+int pa_asyncq_read_fd(pa_asyncq *q) {
pa_assert(q);
return pa_fdsem_get(q->write_fdsem);
}
-int pa_asyncq_before_poll(pa_asyncq *l) {
+int pa_asyncq_read_before_poll(pa_asyncq *l) {
int idx;
pa_atomic_ptr_t *cells;
@@ -206,8 +290,38 @@ int pa_asyncq_before_poll(pa_asyncq *l) {
return 0;
}
-void pa_asyncq_after_poll(pa_asyncq *l) {
+void pa_asyncq_read_after_poll(pa_asyncq *l) {
pa_assert(l);
pa_fdsem_after_poll(l->write_fdsem);
}
+
+int pa_asyncq_write_fd(pa_asyncq *q) {
+ pa_assert(q);
+
+ return pa_fdsem_get(q->read_fdsem);
+}
+
+void pa_asyncq_write_before_poll(pa_asyncq *l) {
+ pa_assert(l);
+
+ for (;;) {
+
+ if (flush_postq(l))
+ break;
+
+ if (pa_fdsem_before_poll(l->read_fdsem) >= 0) {
+ l->waiting_for_post = TRUE;
+ break;
+ }
+ }
+}
+
+void pa_asyncq_write_after_poll(pa_asyncq *l) {
+ pa_assert(l);
+
+ if (l->waiting_for_post) {
+ pa_fdsem_after_poll(l->read_fdsem);
+ l->waiting_for_post = FALSE;
+ }
+}
diff --git a/src/pulsecore/asyncq.h b/src/pulsecore/asyncq.h
index 53d45866..4cdf8cd0 100644
--- a/src/pulsecore/asyncq.h
+++ b/src/pulsecore/asyncq.h
@@ -26,6 +26,7 @@
#include <sys/types.h>
#include <pulse/def.h>
+#include <pulsecore/macro.h>
/* A simple, asynchronous, lock-free (if requested also wait-free)
* queue. Not multiple-reader/multiple-writer safe. If that is
@@ -46,11 +47,21 @@ typedef struct pa_asyncq pa_asyncq;
pa_asyncq* pa_asyncq_new(unsigned size);
void pa_asyncq_free(pa_asyncq* q, pa_free_cb_t free_cb);
-void* pa_asyncq_pop(pa_asyncq *q, int wait);
-int pa_asyncq_push(pa_asyncq *q, void *p, int wait);
+void* pa_asyncq_pop(pa_asyncq *q, pa_bool_t wait);
+int pa_asyncq_push(pa_asyncq *q, void *p, pa_bool_t wait);
-int pa_asyncq_get_fd(pa_asyncq *q);
-int pa_asyncq_before_poll(pa_asyncq *a);
-void pa_asyncq_after_poll(pa_asyncq *a);
+/* Similar to pa_asyncq_push(), but if the queue is full, postpone it
+ * locally and delay until pa_asyncq_before_poll_post() */
+void pa_asyncq_post(pa_asyncq*l, void *p);
+
+/* For the reading side */
+int pa_asyncq_read_fd(pa_asyncq *q);
+int pa_asyncq_read_before_poll(pa_asyncq *a);
+void pa_asyncq_read_after_poll(pa_asyncq *a);
+
+/* For the writing side */
+int pa_asyncq_write_fd(pa_asyncq *q);
+void pa_asyncq_write_before_poll(pa_asyncq *a);
+void pa_asyncq_write_after_poll(pa_asyncq *a);
#endif
diff --git a/src/pulsecore/rtpoll.c b/src/pulsecore/rtpoll.c
index 734f344f..c3e76cac 100644
--- a/src/pulsecore/rtpoll.c
+++ b/src/pulsecore/rtpoll.c
@@ -661,23 +661,23 @@ pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio
return i;
}
-static int asyncmsgq_before(pa_rtpoll_item *i) {
+static int asyncmsgq_read_before(pa_rtpoll_item *i) {
pa_assert(i);
- if (pa_asyncmsgq_before_poll(i->userdata) < 0)
+ if (pa_asyncmsgq_read_before_poll(i->userdata) < 0)
return 1; /* 1 means immediate restart of the loop */
return 0;
}
-static void asyncmsgq_after(pa_rtpoll_item *i) {
+static void asyncmsgq_read_after(pa_rtpoll_item *i) {
pa_assert(i);
pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
- pa_asyncmsgq_after_poll(i->userdata);
+ pa_asyncmsgq_read_after_poll(i->userdata);
}
-static int asyncmsgq_work(pa_rtpoll_item *i) {
+static int asyncmsgq_read_work(pa_rtpoll_item *i) {
pa_msgobject *object;
int code;
void *data;
@@ -703,7 +703,7 @@ static int asyncmsgq_work(pa_rtpoll_item *i) {
return 0;
}
-pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
+pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
pa_rtpoll_item *i;
struct pollfd *pollfd;
@@ -713,12 +713,47 @@ pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t
i = pa_rtpoll_item_new(p, prio, 1);
pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
- pollfd->fd = pa_asyncmsgq_get_fd(q);
+ pollfd->fd = pa_asyncmsgq_read_fd(q);
pollfd->events = POLLIN;
- i->before_cb = asyncmsgq_before;
- i->after_cb = asyncmsgq_after;
- i->work_cb = asyncmsgq_work;
+ i->before_cb = asyncmsgq_read_before;
+ i->after_cb = asyncmsgq_read_after;
+ i->work_cb = asyncmsgq_read_work;
+ i->userdata = q;
+
+ return i;
+}
+
+static int asyncmsgq_write_before(pa_rtpoll_item *i) {
+ pa_assert(i);
+
+ pa_asyncmsgq_write_before_poll(i->userdata);
+ return 0;
+}
+
+static void asyncmsgq_write_after(pa_rtpoll_item *i) {
+ pa_assert(i);
+
+ pa_assert((i->pollfd[0].revents & ~POLLIN) == 0);
+ pa_asyncmsgq_write_after_poll(i->userdata);
+}
+
+pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) {
+ pa_rtpoll_item *i;
+ struct pollfd *pollfd;
+
+ pa_assert(p);
+ pa_assert(q);
+
+ i = pa_rtpoll_item_new(p, prio, 1);
+
+ pollfd = pa_rtpoll_item_get_pollfd(i, NULL);
+ pollfd->fd = pa_asyncmsgq_write_fd(q);
+ pollfd->events = POLLIN;
+
+ i->before_cb = asyncmsgq_write_before;
+ i->after_cb = asyncmsgq_write_after;
+ i->work_cb = NULL;
i->userdata = q;
return i;
diff --git a/src/pulsecore/rtpoll.h b/src/pulsecore/rtpoll.h
index f7f96e67..6d72eb54 100644
--- a/src/pulsecore/rtpoll.h
+++ b/src/pulsecore/rtpoll.h
@@ -106,7 +106,8 @@ void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata);
void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i);
pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *s);
-pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q);
+pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q);
+pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q);
/* Requests the loop to exit. Will cause the next iteration of
* pa_rtpoll_run() to return 0 */
diff --git a/src/pulsecore/thread-mq.c b/src/pulsecore/thread-mq.c
index 9b879425..7e39c577 100644
--- a/src/pulsecore/thread-mq.c
+++ b/src/pulsecore/thread-mq.c
@@ -43,15 +43,15 @@
PA_STATIC_TLS_DECLARE_NO_FREE(thread_mq);
-static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
+static void asyncmsgq_read_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
pa_thread_mq *q = userdata;
pa_asyncmsgq *aq;
- pa_assert(pa_asyncmsgq_get_fd(q->outq) == fd);
+ pa_assert(pa_asyncmsgq_read_fd(q->outq) == fd);
pa_assert(events == PA_IO_EVENT_INPUT);
pa_asyncmsgq_ref(aq = q->outq);
- pa_asyncmsgq_after_poll(aq);
+ pa_asyncmsgq_write_after_poll(aq);
for (;;) {
pa_msgobject *object;
@@ -68,14 +68,24 @@ static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_even
pa_asyncmsgq_done(aq, ret);
}
- if (pa_asyncmsgq_before_poll(aq) == 0)
+ if (pa_asyncmsgq_read_before_poll(aq) == 0)
break;
}
pa_asyncmsgq_unref(aq);
}
-void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop) {
+static void asyncmsgq_write_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
+ pa_thread_mq *q = userdata;
+
+ pa_assert(pa_asyncmsgq_write_fd(q->inq) == fd);
+ pa_assert(events == PA_IO_EVENT_INPUT);
+
+ pa_asyncmsgq_write_after_poll(q->inq);
+ pa_asyncmsgq_write_before_poll(q->inq);
+}
+
+void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop, pa_rtpoll *rtpoll) {
pa_assert(q);
pa_assert(mainloop);
@@ -83,15 +93,22 @@ void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop) {
pa_assert_se(q->inq = pa_asyncmsgq_new(0));
pa_assert_se(q->outq = pa_asyncmsgq_new(0));
- pa_assert_se(pa_asyncmsgq_before_poll(q->outq) == 0);
- pa_assert_se(q->io_event = mainloop->io_new(mainloop, pa_asyncmsgq_get_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_cb, q));
+ pa_assert_se(pa_asyncmsgq_read_before_poll(q->outq) == 0);
+ pa_assert_se(q->read_event = mainloop->io_new(mainloop, pa_asyncmsgq_read_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_read_cb, q));
+
+ pa_asyncmsgq_write_before_poll(q->inq);
+ pa_assert_se(q->write_event = mainloop->io_new(mainloop, pa_asyncmsgq_write_fd(q->inq), PA_IO_EVENT_INPUT, asyncmsgq_write_cb, q));
+
+ pa_rtpoll_item_new_asyncmsgq_read(rtpoll, PA_RTPOLL_EARLY, q->inq);
+ pa_rtpoll_item_new_asyncmsgq_write(rtpoll, PA_RTPOLL_LATE, q->outq);
}
void pa_thread_mq_done(pa_thread_mq *q) {
pa_assert(q);
- q->mainloop->io_free(q->io_event);
- q->io_event = NULL;
+ q->mainloop->io_free(q->read_event);
+ q->mainloop->io_free(q->write_event);
+ q->read_event = q->write_event = NULL;
pa_asyncmsgq_unref(q->inq);
pa_asyncmsgq_unref(q->outq);
diff --git a/src/pulsecore/thread-mq.h b/src/pulsecore/thread-mq.h
index 13b6e01f..0ae49f8c 100644
--- a/src/pulsecore/thread-mq.h
+++ b/src/pulsecore/thread-mq.h
@@ -26,6 +26,7 @@
#include <pulse/mainloop-api.h>
#include <pulsecore/asyncmsgq.h>
+#include <pulsecore/rtpoll.h>
/* Two way communication between a thread and a mainloop. Before the
* thread is started a pa_pthread_mq should be initialized and than
@@ -34,10 +35,10 @@
typedef struct pa_thread_mq {
pa_mainloop_api *mainloop;
pa_asyncmsgq *inq, *outq;
- pa_io_event *io_event;
+ pa_io_event *read_event, *write_event;
} pa_thread_mq;
-void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop);
+void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop, pa_rtpoll *rtpoll);
void pa_thread_mq_done(pa_thread_mq *q);
/* Install the specified pa_thread_mq object for the current thread */