From 1728e3ac982146c0ff2d5e46571aadda6937e4fc Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Thu, 7 Sep 2006 19:08:19 +0000 Subject: make pa_stream thread-safe: use new refcounting system, protect access using mutexes git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@1379 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/pstream.c | 121 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 93 insertions(+), 28 deletions(-) (limited to 'src/pulsecore/pstream.c') diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c index 511972d9..566fb060 100644 --- a/src/pulsecore/pstream.c +++ b/src/pulsecore/pstream.c @@ -46,6 +46,8 @@ #include #include #include +#include +#include #include "pstream.h" @@ -108,12 +110,13 @@ struct item_info { }; struct pa_pstream { - int ref; + PA_REFCNT_DECLARE; pa_mainloop_api *mainloop; pa_defer_event *defer_event; pa_iochannel *io; pa_queue *send_queue; + pa_mutex *mutex; int dead; @@ -163,11 +166,14 @@ static int do_read(pa_pstream *p); static void do_something(pa_pstream *p) { assert(p); - - p->mainloop->defer_enable(p->defer_event, 0); + assert(PA_REFCNT_VALUE(p) > 0); pa_pstream_ref(p); + pa_mutex_lock(p->mutex); + + p->mainloop->defer_enable(p->defer_event, 0); + if (!p->dead && pa_iochannel_is_readable(p->io)) { if (do_read(p) < 0) goto fail; @@ -179,6 +185,8 @@ static void do_something(pa_pstream *p) { goto fail; } + pa_mutex_unlock(p->mutex); + pa_pstream_unref(p); return; @@ -189,6 +197,8 @@ fail: if (p->die_callback) p->die_callback(p, p->die_callback_userdata); + pa_mutex_unlock(p->mutex); + pa_pstream_unref(p); } @@ -221,11 +231,13 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *poo assert(pool); p = pa_xnew(pa_pstream, 1); - p->ref = 1; + PA_REFCNT_INIT(p); p->io = io; pa_iochannel_set_callback(io, io_callback, p); p->dead = 0; + p->mutex = pa_mutex_new(1); + p->mainloop = m; p->defer_event = m->defer_new(m, defer_callback, p); m->defer_enable(p->defer_event, 0); @@ -297,6 +309,9 @@ static void pstream_free(pa_pstream *p) { if (p->read.packet) pa_packet_unref(p->read.packet); + if (p->mutex) + pa_mutex_free(p->mutex); + pa_xfree(p); } @@ -304,11 +319,13 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *cre struct item_info *i; assert(p); - assert(p->ref >= 1); + assert(PA_REFCNT_VALUE(p) > 0); assert(packet); + pa_mutex_lock(p->mutex); + if (p->dead) - return; + goto finish; i = pa_xnew(struct item_info, 1); i->type = PA_PSTREAM_ITEM_PACKET; @@ -321,18 +338,24 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *cre pa_queue_push(p->send_queue, i); p->mainloop->defer_enable(p->defer_event, 1); + +finish: + + pa_mutex_unlock(p->mutex); } void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) { size_t length, idx; assert(p); - assert(p->ref >= 1); + assert(PA_REFCNT_VALUE(p) > 0); assert(channel != (uint32_t) -1); assert(chunk); + pa_mutex_lock(p->mutex); + if (p->dead) - return; + goto finish; length = chunk->length; idx = 0; @@ -363,6 +386,10 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa } p->mainloop->defer_enable(p->defer_event, 1); + +finish: + + pa_mutex_unlock(p->mutex); } static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) { @@ -370,10 +397,12 @@ static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userd pa_pstream *p = userdata; assert(p); - assert(p->ref >= 1); + assert(PA_REFCNT_VALUE(p) > 0); + + pa_mutex_lock(p->mutex); if (p->dead) - return; + goto finish; /* pa_log("Releasing block %u", block_id); */ @@ -386,6 +415,10 @@ static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userd pa_queue_push(p->send_queue, item); p->mainloop->defer_enable(p->defer_event, 1); + +finish: + + pa_mutex_unlock(p->mutex); } static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) { @@ -393,10 +426,12 @@ static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userda pa_pstream *p = userdata; assert(p); - assert(p->ref >= 1); + assert(PA_REFCNT_VALUE(p) > 0); + + pa_mutex_lock(p->mutex); if (p->dead) - return; + goto finish; /* pa_log("Revoking block %u", block_id); */ @@ -409,10 +444,15 @@ static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userda pa_queue_push(p->send_queue, item); p->mainloop->defer_enable(p->defer_event, 1); + +finish: + + pa_mutex_unlock(p->mutex); } static void prepare_next_write_item(pa_pstream *p) { assert(p); + assert(PA_REFCNT_VALUE(p) > 0); if (!(p->write.current = pa_queue_pop(p->send_queue))) return; @@ -501,7 +541,9 @@ static int do_write(pa_pstream *p) { void *d; size_t l; ssize_t r; + assert(p); + assert(PA_REFCNT_VALUE(p) > 0); if (!p->write.current) prepare_next_write_item(p); @@ -552,8 +594,10 @@ static int do_read(pa_pstream *p) { void *d; size_t l; ssize_t r; + assert(p); - + assert(PA_REFCNT_VALUE(p) > 0); + if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) { d = (uint8_t*) p->read.descriptor + p->read.index; l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index; @@ -782,65 +826,83 @@ frame_done: void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) { assert(p); - assert(p->ref >= 1); - + assert(PA_REFCNT_VALUE(p) > 0); + + pa_mutex_lock(p->mutex); p->die_callback = cb; p->die_callback_userdata = userdata; + pa_mutex_unlock(p->mutex); } - void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) { assert(p); - assert(p->ref >= 1); + assert(PA_REFCNT_VALUE(p) > 0); + pa_mutex_lock(p->mutex); p->drain_callback = cb; p->drain_callback_userdata = userdata; + pa_mutex_unlock(p->mutex); } void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) { assert(p); - assert(p->ref >= 1); + assert(PA_REFCNT_VALUE(p) > 0); + pa_mutex_lock(p->mutex); p->recieve_packet_callback = cb; p->recieve_packet_callback_userdata = userdata; + pa_mutex_unlock(p->mutex); } void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) { assert(p); - assert(p->ref >= 1); + assert(PA_REFCNT_VALUE(p) > 0); + pa_mutex_lock(p->mutex); p->recieve_memblock_callback = cb; p->recieve_memblock_callback_userdata = userdata; + pa_mutex_unlock(p->mutex); } int pa_pstream_is_pending(pa_pstream *p) { + int b; + assert(p); + assert(PA_REFCNT_VALUE(p) > 0); + + pa_mutex_lock(p->mutex); if (p->dead) - return 0; + b = 0; + else + b = p->write.current || !pa_queue_is_empty(p->send_queue); + + pa_mutex_unlock(p->mutex); - return p->write.current || !pa_queue_is_empty(p->send_queue); + return b; } void pa_pstream_unref(pa_pstream*p) { assert(p); - assert(p->ref >= 1); + assert(PA_REFCNT_VALUE(p) > 0); - if (--p->ref == 0) + if (PA_REFCNT_DEC(p) <= 0) pstream_free(p); } pa_pstream* pa_pstream_ref(pa_pstream*p) { assert(p); - assert(p->ref >= 1); + assert(PA_REFCNT_VALUE(p) > 0); - p->ref++; + PA_REFCNT_INC(p); return p; } void pa_pstream_close(pa_pstream *p) { assert(p); + pa_mutex_lock(p->mutex); + p->dead = 1; if (p->import) { @@ -868,12 +930,14 @@ void pa_pstream_close(pa_pstream *p) { p->recieve_packet_callback = NULL; p->recieve_memblock_callback = NULL; - + pa_mutex_unlock(p->mutex); } void pa_pstream_use_shm(pa_pstream *p, int enable) { assert(p); - assert(p->ref >= 1); + assert(PA_REFCNT_VALUE(p) > 0); + + pa_mutex_lock(p->mutex); p->use_shm = enable; @@ -888,6 +952,7 @@ void pa_pstream_use_shm(pa_pstream *p, int enable) { pa_memexport_free(p->export); p->export = NULL; } - } + + pa_mutex_unlock(p->mutex); } -- cgit