summaryrefslogtreecommitdiffstats
path: root/src/pulsecore/pstream.c
diff options
context:
space:
mode:
authorLennart Poettering <lennart@poettering.net>2006-09-07 19:08:19 +0000
committerLennart Poettering <lennart@poettering.net>2006-09-07 19:08:19 +0000
commit1728e3ac982146c0ff2d5e46571aadda6937e4fc (patch)
tree45b52af6da7e53eccaa151a5dce35f90a6419b1e /src/pulsecore/pstream.c
parent0669c99fb6fedcd43af43466b7967b30b5af4f1d (diff)
make pa_stream thread-safe: use new refcounting system, protect access using mutexes
git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@1379 fefdeb5f-60dc-0310-8127-8f9354f1896f
Diffstat (limited to 'src/pulsecore/pstream.c')
-rw-r--r--src/pulsecore/pstream.c121
1 files changed, 93 insertions, 28 deletions
diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 511972d9..566fb060 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -46,6 +46,8 @@
#include <pulsecore/log.h>
#include <pulsecore/core-scache.h>
#include <pulsecore/creds.h>
+#include <pulsecore/mutex.h>
+#include <pulsecore/refcnt.h>
#include "pstream.h"
@@ -108,12 +110,13 @@ struct item_info {
};
struct pa_pstream {
- int ref;
+ PA_REFCNT_DECLARE;
pa_mainloop_api *mainloop;
pa_defer_event *defer_event;
pa_iochannel *io;
pa_queue *send_queue;
+ pa_mutex *mutex;
int dead;
@@ -163,11 +166,14 @@ static int do_read(pa_pstream *p);
static void do_something(pa_pstream *p) {
assert(p);
-
- p->mainloop->defer_enable(p->defer_event, 0);
+ assert(PA_REFCNT_VALUE(p) > 0);
pa_pstream_ref(p);
+ pa_mutex_lock(p->mutex);
+
+ p->mainloop->defer_enable(p->defer_event, 0);
+
if (!p->dead && pa_iochannel_is_readable(p->io)) {
if (do_read(p) < 0)
goto fail;
@@ -179,6 +185,8 @@ static void do_something(pa_pstream *p) {
goto fail;
}
+ pa_mutex_unlock(p->mutex);
+
pa_pstream_unref(p);
return;
@@ -189,6 +197,8 @@ fail:
if (p->die_callback)
p->die_callback(p, p->die_callback_userdata);
+ pa_mutex_unlock(p->mutex);
+
pa_pstream_unref(p);
}
@@ -221,11 +231,13 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *poo
assert(pool);
p = pa_xnew(pa_pstream, 1);
- p->ref = 1;
+ PA_REFCNT_INIT(p);
p->io = io;
pa_iochannel_set_callback(io, io_callback, p);
p->dead = 0;
+ p->mutex = pa_mutex_new(1);
+
p->mainloop = m;
p->defer_event = m->defer_new(m, defer_callback, p);
m->defer_enable(p->defer_event, 0);
@@ -297,6 +309,9 @@ static void pstream_free(pa_pstream *p) {
if (p->read.packet)
pa_packet_unref(p->read.packet);
+ if (p->mutex)
+ pa_mutex_free(p->mutex);
+
pa_xfree(p);
}
@@ -304,11 +319,13 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *cre
struct item_info *i;
assert(p);
- assert(p->ref >= 1);
+ assert(PA_REFCNT_VALUE(p) > 0);
assert(packet);
+ pa_mutex_lock(p->mutex);
+
if (p->dead)
- return;
+ goto finish;
i = pa_xnew(struct item_info, 1);
i->type = PA_PSTREAM_ITEM_PACKET;
@@ -321,18 +338,24 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *cre
pa_queue_push(p->send_queue, i);
p->mainloop->defer_enable(p->defer_event, 1);
+
+finish:
+
+ pa_mutex_unlock(p->mutex);
}
void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
size_t length, idx;
assert(p);
- assert(p->ref >= 1);
+ assert(PA_REFCNT_VALUE(p) > 0);
assert(channel != (uint32_t) -1);
assert(chunk);
+ pa_mutex_lock(p->mutex);
+
if (p->dead)
- return;
+ goto finish;
length = chunk->length;
idx = 0;
@@ -363,6 +386,10 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa
}
p->mainloop->defer_enable(p->defer_event, 1);
+
+finish:
+
+ pa_mutex_unlock(p->mutex);
}
static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
@@ -370,10 +397,12 @@ static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userd
pa_pstream *p = userdata;
assert(p);
- assert(p->ref >= 1);
+ assert(PA_REFCNT_VALUE(p) > 0);
+
+ pa_mutex_lock(p->mutex);
if (p->dead)
- return;
+ goto finish;
/* pa_log("Releasing block %u", block_id); */
@@ -386,6 +415,10 @@ static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userd
pa_queue_push(p->send_queue, item);
p->mainloop->defer_enable(p->defer_event, 1);
+
+finish:
+
+ pa_mutex_unlock(p->mutex);
}
static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
@@ -393,10 +426,12 @@ static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userda
pa_pstream *p = userdata;
assert(p);
- assert(p->ref >= 1);
+ assert(PA_REFCNT_VALUE(p) > 0);
+
+ pa_mutex_lock(p->mutex);
if (p->dead)
- return;
+ goto finish;
/* pa_log("Revoking block %u", block_id); */
@@ -409,10 +444,15 @@ static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userda
pa_queue_push(p->send_queue, item);
p->mainloop->defer_enable(p->defer_event, 1);
+
+finish:
+
+ pa_mutex_unlock(p->mutex);
}
static void prepare_next_write_item(pa_pstream *p) {
assert(p);
+ assert(PA_REFCNT_VALUE(p) > 0);
if (!(p->write.current = pa_queue_pop(p->send_queue)))
return;
@@ -501,7 +541,9 @@ static int do_write(pa_pstream *p) {
void *d;
size_t l;
ssize_t r;
+
assert(p);
+ assert(PA_REFCNT_VALUE(p) > 0);
if (!p->write.current)
prepare_next_write_item(p);
@@ -552,8 +594,10 @@ static int do_read(pa_pstream *p) {
void *d;
size_t l;
ssize_t r;
+
assert(p);
-
+ assert(PA_REFCNT_VALUE(p) > 0);
+
if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
d = (uint8_t*) p->read.descriptor + p->read.index;
l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
@@ -782,65 +826,83 @@ frame_done:
void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
assert(p);
- assert(p->ref >= 1);
-
+ assert(PA_REFCNT_VALUE(p) > 0);
+
+ pa_mutex_lock(p->mutex);
p->die_callback = cb;
p->die_callback_userdata = userdata;
+ pa_mutex_unlock(p->mutex);
}
-
void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
assert(p);
- assert(p->ref >= 1);
+ assert(PA_REFCNT_VALUE(p) > 0);
+ pa_mutex_lock(p->mutex);
p->drain_callback = cb;
p->drain_callback_userdata = userdata;
+ pa_mutex_unlock(p->mutex);
}
void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
assert(p);
- assert(p->ref >= 1);
+ assert(PA_REFCNT_VALUE(p) > 0);
+ pa_mutex_lock(p->mutex);
p->recieve_packet_callback = cb;
p->recieve_packet_callback_userdata = userdata;
+ pa_mutex_unlock(p->mutex);
}
void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
assert(p);
- assert(p->ref >= 1);
+ assert(PA_REFCNT_VALUE(p) > 0);
+ pa_mutex_lock(p->mutex);
p->recieve_memblock_callback = cb;
p->recieve_memblock_callback_userdata = userdata;
+ pa_mutex_unlock(p->mutex);
}
int pa_pstream_is_pending(pa_pstream *p) {
+ int b;
+
assert(p);
+ assert(PA_REFCNT_VALUE(p) > 0);
+
+ pa_mutex_lock(p->mutex);
if (p->dead)
- return 0;
+ b = 0;
+ else
+ b = p->write.current || !pa_queue_is_empty(p->send_queue);
+
+ pa_mutex_unlock(p->mutex);
- return p->write.current || !pa_queue_is_empty(p->send_queue);
+ return b;
}
void pa_pstream_unref(pa_pstream*p) {
assert(p);
- assert(p->ref >= 1);
+ assert(PA_REFCNT_VALUE(p) > 0);
- if (--p->ref == 0)
+ if (PA_REFCNT_DEC(p) <= 0)
pstream_free(p);
}
pa_pstream* pa_pstream_ref(pa_pstream*p) {
assert(p);
- assert(p->ref >= 1);
+ assert(PA_REFCNT_VALUE(p) > 0);
- p->ref++;
+ PA_REFCNT_INC(p);
return p;
}
void pa_pstream_close(pa_pstream *p) {
assert(p);
+ pa_mutex_lock(p->mutex);
+
p->dead = 1;
if (p->import) {
@@ -868,12 +930,14 @@ void pa_pstream_close(pa_pstream *p) {
p->recieve_packet_callback = NULL;
p->recieve_memblock_callback = NULL;
-
+ pa_mutex_unlock(p->mutex);
}
void pa_pstream_use_shm(pa_pstream *p, int enable) {
assert(p);
- assert(p->ref >= 1);
+ assert(PA_REFCNT_VALUE(p) > 0);
+
+ pa_mutex_lock(p->mutex);
p->use_shm = enable;
@@ -888,6 +952,7 @@ void pa_pstream_use_shm(pa_pstream *p, int enable) {
pa_memexport_free(p->export);
p->export = NULL;
}
-
}
+
+ pa_mutex_unlock(p->mutex);
}