/*** This file is part of PulseAudio. Copyright 2006 Lennart Poettering PulseAudio is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. PulseAudio is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with PulseAudio; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. ***/ #ifdef HAVE_CONFIG_H #include #endif #include #include #include #include #include #include #include #include #include #include #include #include "asyncmsgq.h" PA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree); PA_STATIC_FLIST_DECLARE(semaphores, 0, (void(*)(void*)) pa_semaphore_free); struct asyncmsgq_item { int code; pa_msgobject *object; void *userdata; pa_free_cb_t free_cb; int64_t offset; pa_memchunk memchunk; pa_semaphore *semaphore; int ret; }; struct pa_asyncmsgq { PA_REFCNT_DECLARE; pa_asyncq *asyncq; pa_mutex *mutex; /* only for the writer side */ struct asyncmsgq_item *current; }; pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) { pa_asyncmsgq *a; a = pa_xnew(pa_asyncmsgq, 1); PA_REFCNT_INIT(a); pa_assert_se(a->asyncq = pa_asyncq_new(size)); pa_assert_se(a->mutex = pa_mutex_new(FALSE, TRUE)); a->current = NULL; return a; } static void asyncmsgq_free(pa_asyncmsgq *a) { struct asyncmsgq_item *i; pa_assert(a); while ((i = pa_asyncq_pop(a->asyncq, FALSE))) { pa_assert(!i->semaphore); if (i->object) pa_msgobject_unref(i->object); if (i->memchunk.memblock) pa_memblock_unref(i->memchunk.memblock); if (i->free_cb) i->free_cb(i->userdata); if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0) pa_xfree(i); } pa_asyncq_free(a->asyncq, NULL); pa_mutex_free(a->mutex); pa_xfree(a); } pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) { pa_assert(PA_REFCNT_VALUE(q) > 0); PA_REFCNT_INC(q); return q; } void pa_asyncmsgq_unref(pa_asyncmsgq* q) { pa_assert(PA_REFCNT_VALUE(q) > 0); if (PA_REFCNT_DEC(q) <= 0) asyncmsgq_free(q); } void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) { struct asyncmsgq_item *i; pa_assert(PA_REFCNT_VALUE(a) > 0); if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq)))) i = pa_xnew(struct asyncmsgq_item, 1); i->code = code; i->object = object ? pa_msgobject_ref(object) : NULL; i->userdata = (void*) userdata; i->free_cb = free_cb; i->offset = offset; if (chunk) { pa_assert(chunk->memblock); i->memchunk = *chunk; pa_memblock_ref(i->memchunk.memblock); } else pa_memchunk_reset(&i->memchunk); i->semaphore = NULL; /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */ pa_mutex_lock(a->mutex); pa_asyncq_post(a->asyncq, i); pa_mutex_unlock(a->mutex); } int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) { struct asyncmsgq_item i; pa_assert(PA_REFCNT_VALUE(a) > 0); i.code = code; i.object = object; i.userdata = (void*) userdata; i.free_cb = NULL; i.ret = -1; i.offset = offset; if (chunk) { pa_assert(chunk->memblock); i.memchunk = *chunk; } else pa_memchunk_reset(&i.memchunk); if (!(i.semaphore = pa_flist_pop(PA_STATIC_FLIST_GET(semaphores)))) i.semaphore = pa_semaphore_new(0); pa_assert_se(i.semaphore); /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */ pa_mutex_lock(a->mutex); pa_assert_se(pa_asyncq_push(a->asyncq, &i, TRUE) == 0); pa_mutex_unlock(a->mutex); pa_semaphore_wait(i.semaphore); if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores), i.semaphore) < 0) pa_semaphore_free(i.semaphore); return i.ret; } int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, pa_bool_t wait_op) { pa_assert(PA_REFCNT_VALUE(a) > 0); pa_assert(!a->current); if (!(a->current = pa_asyncq_pop(a->asyncq, wait_op))) { /* pa_log("failure"); */ return -1; } /* pa_log("success"); */ if (code) *code = a->current->code; if (userdata) *userdata = a->current->userdata; if (offset) *offset = a->current->offset; if (object) { if ((*object = a->current->object)) pa_msgobject_assert_ref(*object); } if (chunk) *chunk = a->current->memchunk; /* pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", */ /* (void*) a, */ /* (void*) a->current->object, */ /* a->current->object ? a->current->object->parent.type_name : NULL, */ /* a->current->code, */ /* (void*) a->current->userdata, */ /* (unsigned long) a->current->memchunk.length); */ return 0; } void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) { pa_assert(PA_REFCNT_VALUE(a) > 0); pa_assert(a); pa_assert(a->current); if (a->current->semaphore) { a->current->ret = ret; pa_semaphore_post(a->current->semaphore); } else { if (a->current->free_cb) a->current->free_cb(a->current->userdata); if (a->current->object) pa_msgobject_unref(a->current->object); if (a->current->memchunk.memblock) pa_memblock_unref(a->current->memchunk.memblock); if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0) pa_xfree(a->current); } a->current = NULL; } int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) { int c; pa_assert(PA_REFCNT_VALUE(a) > 0); pa_asyncmsgq_ref(a); do { pa_msgobject *o; void *data; int64_t offset; pa_memchunk chunk; int ret; if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, TRUE) < 0) return -1; ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk); pa_asyncmsgq_done(a, ret); } while (c != code); pa_asyncmsgq_unref(a); return 0; } int pa_asyncmsgq_process_one(pa_asyncmsgq *a) { pa_msgobject *object; int code; void *data; pa_memchunk chunk; int64_t offset; int ret; pa_assert(PA_REFCNT_VALUE(a) > 0); if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, FALSE) < 0) return 0; pa_asyncmsgq_ref(a); ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); pa_asyncmsgq_done(a, ret); pa_asyncmsgq_unref(a); return 1; } int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) { pa_assert(PA_REFCNT_VALUE(a) > 0); return pa_asyncq_read_fd(a->asyncq); } int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) { pa_assert(PA_REFCNT_VALUE(a) > 0); return pa_asyncq_read_before_poll(a->asyncq); } void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) { pa_assert(PA_REFCNT_VALUE(a) > 0); pa_asyncq_read_after_poll(a->asyncq); } int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) { pa_assert(PA_REFCNT_VALUE(a) > 0); return pa_asyncq_write_fd(a->asyncq); } void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) { pa_assert(PA_REFCNT_VALUE(a) > 0); pa_asyncq_write_before_poll(a->asyncq); } void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) { pa_assert(PA_REFCNT_VALUE(a) > 0); pa_asyncq_write_after_poll(a->asyncq); } int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) { if (object) return object->process_msg(object, code, userdata, offset, pa_memchunk_isset(memchunk) ? memchunk : NULL); return 0; } void pa_asyncmsgq_flush(pa_asyncmsgq *a, pa_bool_t run) { pa_assert(PA_REFCNT_VALUE(a) > 0); for (;;) { pa_msgobject *object; int code; void *data; int64_t offset; pa_memchunk chunk; int ret; if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, FALSE) < 0) return; if (!run) { pa_asyncmsgq_done(a, -1); continue; } pa_asyncmsgq_ref(a); ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk); pa_asyncmsgq_done(a, ret); pa_asyncmsgq_unref(a); } } pa_bool_t pa_asyncmsgq_dispatching(pa_asyncmsgq *a) { pa_assert(PA_REFCNT_VALUE(a) > 0); return !!a->current; }