diff options
| author | Lennart Poettering <lennart@poettering.net> | 2007-08-10 22:01:17 +0000 | 
|---|---|---|
| committer | Lennart Poettering <lennart@poettering.net> | 2007-08-10 22:01:17 +0000 | 
| commit | f7171e86caef4f71f58d4f65d9cada4e53a19396 (patch) | |
| tree | b673390b74ccb40174b192aea274715b97408e4e | |
| parent | aff77c162bad2c9375a908a871f01a6fddd02278 (diff) | |
Wrap two pa_asyncmsq in a new pa_thread_mq object for bidirectional, lock-free communication between a main loop and a thread
git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1622 fefdeb5f-60dc-0310-8127-8f9354f1896f
| -rw-r--r-- | src/Makefile.am | 1 | ||||
| -rw-r--r-- | src/pulsecore/thread-mq.c | 119 | ||||
| -rw-r--r-- | src/pulsecore/thread-mq.h | 49 | 
3 files changed, 169 insertions, 0 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index 6cb3f288..2d3af078 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -662,6 +662,7 @@ libpulsecore_la_SOURCES += \  		pulsecore/flist.c pulsecore/flist.h \  		pulsecore/asyncmsgq.c pulsecore/asyncmsgqq.h \  		pulsecore/asyncq.c pulsecore/asyncq.h \ +		pulsecore/thread-mq.c pulsecore/thread-mq.h \  		pulsecore/fdsem.c pulsecore/fdsem.h \  		pulsecore/object.c pulsecore/object.h \  		pulsecore/msgobject.c pulsecore/msgobject.h \ diff --git a/src/pulsecore/thread-mq.c b/src/pulsecore/thread-mq.c new file mode 100644 index 00000000..224a14cb --- /dev/null +++ b/src/pulsecore/thread-mq.c @@ -0,0 +1,119 @@ +/* $Id$ */ + +/*** +  This file is part of PulseAudio. + +  Copyright 2006 Lennart Poettering + +  PulseAudio is free software; you can redistribute it and/or modify +  it under the terms of the GNU Lesser General Public License as +  published by the Free Software Foundation; either version 2.1 of the +  License, or (at your option) any later version. + +  PulseAudio is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +  Lesser General Public License for more details. + +  You should have received a copy of the GNU Lesser General Public +  License along with PulseAudio; if not, write to the Free Software +  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +  USA. +***/ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <unistd.h> +#include <errno.h> + +#include <pulse/xmalloc.h> + +#include <pulsecore/atomic.h> +#include <pulsecore/once.h> +#include <pulsecore/log.h> +#include <pulsecore/thread.h> +#include <pulsecore/semaphore.h> +#include <pulsecore/macro.h> +#include <pulsecore/core-util.h> +#include <pulsecore/flist.h> + +#include "thread-mq.h" + +static pa_once once = PA_ONCE_INIT; +static pa_tls *tls; + +static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) { +    pa_thread_mq *q = userdata; + +    pa_assert(pa_asyncmsgq_get_fd(q->outq) == fd); +    pa_assert(events == PA_IO_EVENT_INPUT); + +    pa_asyncmsgq_after_poll(q->outq); + +    for (;;) { +        pa_msgobject *object; +        int code; +        void *data; +        int64_t offset; +        pa_memchunk chunk; + +        /* Check whether there is a message for us to process */ +        while (pa_asyncmsgq_get(q->outq, &object, &code, &data, &offset, &chunk, 0) == 0) { +            int ret; + +            ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); +            pa_asyncmsgq_done(q->outq, ret); +        } + +        if (pa_asyncmsgq_before_poll(q->outq) == 0) +            break; +    } +} + +void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop) { +    pa_assert(q); +    pa_assert(mainloop); + +    q->mainloop = mainloop; +    pa_assert_se(q->inq = pa_asyncmsgq_new(0)); +    pa_assert_se(q->outq = pa_asyncmsgq_new(0)); +     +    pa_assert_se(pa_asyncmsgq_before_poll(q->outq) == 0); +    pa_assert_se(q->io_event = mainloop->io_new(mainloop, pa_asyncmsgq_get_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_cb, q)); +} + +void pa_thread_mq_done(pa_thread_mq *q) { +    pa_assert(q); + +    q->mainloop->io_free(q->io_event); +    q->io_event = NULL; + +    pa_asyncmsgq_after_poll(q->outq); +    pa_asyncmsgq_free(q->inq); +    pa_asyncmsgq_free(q->outq); +    q->inq = q->outq = NULL; +     +    q->mainloop = NULL; +} + +static void init_tls(void) { +    tls = pa_tls_new(NULL); +} + +void pa_thread_mq_install(pa_thread_mq *q) { +    pa_assert(q); + +    pa_run_once(&once, init_tls); +    pa_tls_set(tls, q); +} + +pa_thread_mq *pa_thread_mq_get(void) { +    pa_thread_mq *q; + +    pa_run_once(&once, init_tls); +    pa_assert_se(q = pa_tls_get(tls)); +    return q; +} + diff --git a/src/pulsecore/thread-mq.h b/src/pulsecore/thread-mq.h new file mode 100644 index 00000000..13b6e01f --- /dev/null +++ b/src/pulsecore/thread-mq.h @@ -0,0 +1,49 @@ +#ifndef foopulsethreadmqhfoo +#define foopulsethreadmqhfoo + +/* $Id$ */ + +/*** +  This file is part of PulseAudio. + +  Copyright 2004-2006 Lennart Poettering + +  PulseAudio is free software; you can redistribute it and/or modify +  it under the terms of the GNU Lesser General Public License as +  published by the Free Software Foundation; either version 2.1 of the +  License, or (at your option) any later version. + +  PulseAudio is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +  Lesser General Public License for more details. + +  You should have received a copy of the GNU Lesser General Public +  License along with PulseAudio; if not, write to the Free Software +  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +  USA. +***/ + +#include <pulse/mainloop-api.h> +#include <pulsecore/asyncmsgq.h> + +/* Two way communication between a thread and a mainloop. Before the + * thread is started a pa_pthread_mq should be initialized and than + * attached to the thread using pa_thread_mq_install(). */ + +typedef struct pa_thread_mq { +    pa_mainloop_api *mainloop; +    pa_asyncmsgq *inq, *outq; +    pa_io_event *io_event; +} pa_thread_mq; + +void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop); +void pa_thread_mq_done(pa_thread_mq *q); + +/* Install the specified pa_thread_mq object for the current thread */ +void pa_thread_mq_install(pa_thread_mq *q); + +/* Return the pa_thread_mq object that is set for the current thread */ +pa_thread_mq *pa_thread_mq_get(void); + +#endif | 
