diff options
Diffstat (limited to 'src/pulsecore/thread-mq.c')
-rw-r--r-- | src/pulsecore/thread-mq.c | 41 |
1 files changed, 28 insertions, 13 deletions
diff --git a/src/pulsecore/thread-mq.c b/src/pulsecore/thread-mq.c index d572f6e0..34f92a7e 100644 --- a/src/pulsecore/thread-mq.c +++ b/src/pulsecore/thread-mq.c @@ -1,5 +1,3 @@ -/* $Id$ */ - /*** This file is part of PulseAudio. @@ -43,15 +41,15 @@ PA_STATIC_TLS_DECLARE_NO_FREE(thread_mq); -static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) { +static void asyncmsgq_read_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) { pa_thread_mq *q = userdata; pa_asyncmsgq *aq; - pa_assert(pa_asyncmsgq_get_fd(q->outq) == fd); + pa_assert(pa_asyncmsgq_read_fd(q->outq) == fd); pa_assert(events == PA_IO_EVENT_INPUT); pa_asyncmsgq_ref(aq = q->outq); - pa_asyncmsgq_after_poll(aq); + pa_asyncmsgq_write_after_poll(aq); for (;;) { pa_msgobject *object; @@ -68,35 +66,52 @@ static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_even pa_asyncmsgq_done(aq, ret); } - if (pa_asyncmsgq_before_poll(aq) == 0) + if (pa_asyncmsgq_read_before_poll(aq) == 0) break; } pa_asyncmsgq_unref(aq); } -void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop) { +static void asyncmsgq_write_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) { + pa_thread_mq *q = userdata; + + pa_assert(pa_asyncmsgq_write_fd(q->inq) == fd); + pa_assert(events == PA_IO_EVENT_INPUT); + + pa_asyncmsgq_write_after_poll(q->inq); + pa_asyncmsgq_write_before_poll(q->inq); +} + +void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop, pa_rtpoll *rtpoll) { pa_assert(q); pa_assert(mainloop); q->mainloop = mainloop; pa_assert_se(q->inq = pa_asyncmsgq_new(0)); pa_assert_se(q->outq = pa_asyncmsgq_new(0)); - - pa_assert_se(pa_asyncmsgq_before_poll(q->outq) == 0); - pa_assert_se(q->io_event = mainloop->io_new(mainloop, pa_asyncmsgq_get_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_cb, q)); + + pa_assert_se(pa_asyncmsgq_read_before_poll(q->outq) == 0); + pa_assert_se(q->read_event = mainloop->io_new(mainloop, pa_asyncmsgq_read_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_read_cb, q)); + + pa_asyncmsgq_write_before_poll(q->inq); + pa_assert_se(q->write_event = mainloop->io_new(mainloop, pa_asyncmsgq_write_fd(q->inq), PA_IO_EVENT_INPUT, asyncmsgq_write_cb, q)); + + pa_rtpoll_item_new_asyncmsgq_read(rtpoll, PA_RTPOLL_EARLY, q->inq); + pa_rtpoll_item_new_asyncmsgq_write(rtpoll, PA_RTPOLL_LATE, q->outq); } void pa_thread_mq_done(pa_thread_mq *q) { pa_assert(q); - q->mainloop->io_free(q->io_event); - q->io_event = NULL; + q->mainloop->io_free(q->read_event); + q->mainloop->io_free(q->write_event); + q->read_event = q->write_event = NULL; pa_asyncmsgq_unref(q->inq); pa_asyncmsgq_unref(q->outq); q->inq = q->outq = NULL; - + q->mainloop = NULL; } |