summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPierre Ossman <ossman@cendio.se>2006-05-29 12:19:46 +0200
committerTakashi Iwai <tiwai@suse.de>2006-05-29 12:19:46 +0200
commite945a42f2c28c271a56ce0546d38f0c33c4b8125 (patch)
tree245157d31dbe9aa7427f52b782c0d2cf3cd60390
parenta80fb39fa7cfdf1a5b157b526d3eebe0cd73716e (diff)
Update Polypaudio plug-in to the 0.9.0 API
The new version of Polypaudio includes a threading abstraction that allows application of a more synchronous nature to use the API more easily. Using this, the complexity of the Polypaudio plug-in is greatly reduced and also removes the risk of stalling the communications layer. Signed-off-by: Pierre Ossman <ossman@cendio.se>
-rw-r--r--configure.in2
-rw-r--r--polyp/ctl_polyp.c141
-rw-r--r--polyp/pcm_polyp.c235
-rw-r--r--polyp/polyp.c324
-rw-r--r--polyp/polyp.h18
5 files changed, 308 insertions, 412 deletions
diff --git a/configure.in b/configure.in
index 178d1e4..9c779e2 100644
--- a/configure.in
+++ b/configure.in
@@ -16,7 +16,7 @@ AC_CHECK_LIB(asound, snd_pcm_ioplug_create,,
PKG_CHECK_MODULES(JACK, jack >= 0.98, [HAVE_JACK=yes], [HAVE_JACK=no])
AM_CONDITIONAL(HAVE_JACK, test x$HAVE_JACK = xyes)
-PKG_CHECK_MODULES(polypaudio, [polyplib], [HAVE_POLYP=yes], [HAVE_POLYP=no])
+PKG_CHECK_MODULES(polypaudio, [polyplib >= 0.9.0], [HAVE_POLYP=yes], [HAVE_POLYP=no])
AM_CONDITIONAL(HAVE_POLYP, test x$HAVE_POLYP = xyes)
PKG_CHECK_MODULES(samplerate, [samplerate], [HAVE_SAMPLERATE=yes], [HAVE_SAMPLERATE=no])
diff --git a/polyp/ctl_polyp.c b/polyp/ctl_polyp.c
index d27e8d9..dea3fb8 100644
--- a/polyp/ctl_polyp.c
+++ b/polyp/ctl_polyp.c
@@ -20,8 +20,6 @@
#include <sys/poll.h>
-#include <pthread.h>
-
#include <alsa/asoundlib.h>
#include <alsa/control_external.h>
@@ -43,8 +41,6 @@ typedef struct snd_ctl_polyp {
int subscribed;
int updated;
-
- pthread_mutex_t mutex;
} snd_ctl_polyp_t;
#define SOURCE_VOL_NAME "Capture Volume"
@@ -62,14 +58,19 @@ static void sink_info_cb(pa_context *c, const pa_sink_info *i, int is_last, void
snd_ctl_polyp_t *ctl = (snd_ctl_polyp_t*)userdata;
int chan;
- if (is_last)
+ assert(ctl);
+
+ if (is_last) {
+ pa_threaded_mainloop_signal(ctl->p->mainloop, 0);
return;
+ }
- assert(ctl && i);
+ assert(i);
if (!!ctl->sink_muted != !!i->mute) {
ctl->sink_muted = i->mute;
ctl->updated |= UPDATE_SINK_MUTE;
+ polyp_poll_activate(ctl->p);
}
if (ctl->sink_volume.channels == i->volume.channels) {
@@ -81,6 +82,7 @@ static void sink_info_cb(pa_context *c, const pa_sink_info *i, int is_last, void
return;
ctl->updated |= UPDATE_SINK_VOL;
+ polyp_poll_activate(ctl->p);
}
memcpy(&ctl->sink_volume, &i->volume, sizeof(pa_cvolume));
@@ -91,14 +93,19 @@ static void source_info_cb(pa_context *c, const pa_source_info *i, int is_last,
snd_ctl_polyp_t *ctl = (snd_ctl_polyp_t*)userdata;
int chan;
- if (is_last)
+ assert(ctl);
+
+ if (is_last) {
+ pa_threaded_mainloop_signal(ctl->p->mainloop, 0);
return;
+ }
- assert(ctl && i);
+ assert(i);
if (!!ctl->source_muted != !!i->mute) {
ctl->source_muted = i->mute;
ctl->updated |= UPDATE_SOURCE_MUTE;
+ polyp_poll_activate(ctl->p);
}
if (ctl->source_volume.channels == i->volume.channels) {
@@ -110,6 +117,7 @@ static void source_info_cb(pa_context *c, const pa_source_info *i, int is_last,
return;
ctl->updated |= UPDATE_SOURCE_VOL;
+ polyp_poll_activate(ctl->p);
}
memcpy(&ctl->source_volume, &i->volume, sizeof(pa_cvolume));
@@ -163,14 +171,14 @@ static int polyp_elem_count(snd_ctl_ext_t *ext)
assert(ctl);
- pthread_mutex_lock(&ctl->mutex);
+ pa_threaded_mainloop_lock(ctl->p->mainloop);
if (ctl->source)
count += 2;
if (ctl->sink)
count += 2;
- pthread_mutex_unlock(&ctl->mutex);
+ pa_threaded_mainloop_unlock(ctl->p->mainloop);
return count;
}
@@ -184,7 +192,7 @@ static int polyp_elem_list(snd_ctl_ext_t *ext, unsigned int offset,
snd_ctl_elem_id_set_interface(id, SND_CTL_ELEM_IFACE_MIXER);
- pthread_mutex_lock(&ctl->mutex);
+ pa_threaded_mainloop_lock(ctl->p->mainloop);
if (ctl->source) {
if (offset == 0)
@@ -194,7 +202,7 @@ static int polyp_elem_list(snd_ctl_ext_t *ext, unsigned int offset,
} else
offset += 2;
- pthread_mutex_unlock(&ctl->mutex);
+ pa_threaded_mainloop_unlock(ctl->p->mainloop);
if (offset == 2)
snd_ctl_elem_id_set_name(id, SINK_VOL_NAME);
@@ -229,18 +237,13 @@ static int polyp_get_attribute(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key,
snd_ctl_polyp_t *ctl = ext->private_data;
int err = 0;
- assert(ctl);
-
if (key > 3)
return -EINVAL;
- pthread_mutex_lock(&ctl->mutex);
-
+ assert(ctl);
assert(ctl->p);
- err = polyp_finish_poll(ctl->p);
- if (err < 0)
- goto finish;
+ pa_threaded_mainloop_lock(ctl->p->mainloop);
err = polyp_check_connection(ctl->p);
if (err < 0)
@@ -265,7 +268,7 @@ static int polyp_get_attribute(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key,
*count = 1;
finish:
- pthread_mutex_unlock(&ctl->mutex);
+ pa_threaded_mainloop_unlock(ctl->p->mainloop);
return err;
}
@@ -288,14 +291,9 @@ static int polyp_read_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key,
pa_cvolume *vol = NULL;
assert(ctl);
-
- pthread_mutex_lock(&ctl->mutex);
-
assert(ctl->p);
- err = polyp_finish_poll(ctl->p);
- if (err < 0)
- goto finish;
+ pa_threaded_mainloop_lock(ctl->p->mainloop);
err = polyp_check_connection(ctl->p);
if (err < 0)
@@ -329,7 +327,7 @@ static int polyp_read_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key,
}
finish:
- pthread_mutex_unlock(&ctl->mutex);
+ pa_threaded_mainloop_unlock(ctl->p->mainloop);
return err;
}
@@ -343,14 +341,9 @@ static int polyp_write_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key,
pa_cvolume *vol = NULL;
assert(ctl);
-
- pthread_mutex_lock(&ctl->mutex);
-
assert(ctl->p && ctl->p->context);
- err = polyp_finish_poll(ctl->p);
- if (err < 0)
- goto finish;
+ pa_threaded_mainloop_lock(ctl->p->mainloop);
err = polyp_check_connection(ctl->p);
if (err < 0)
@@ -395,17 +388,17 @@ static int polyp_write_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key,
if (key == 0)
o = pa_context_set_source_volume_by_name(ctl->p->context,
- ctl->source, vol, NULL, NULL);
+ ctl->source, vol, polyp_context_success_cb, ctl->p);
else
o = pa_context_set_sink_volume_by_name(ctl->p->context,
- ctl->sink, vol, NULL, NULL);
+ ctl->sink, vol, polyp_context_success_cb, ctl->p);
} else {
if (key == 1)
o = pa_context_set_source_mute_by_name(ctl->p->context,
- ctl->source, ctl->source_muted, NULL, NULL);
+ ctl->source, ctl->source_muted, polyp_context_success_cb, ctl->p);
else
o = pa_context_set_sink_mute_by_name(ctl->p->context,
- ctl->sink, ctl->sink_muted, NULL, NULL);
+ ctl->sink, ctl->sink_muted, polyp_context_success_cb, ctl->p);
}
err = polyp_wait_operation(ctl->p, o);
@@ -416,7 +409,7 @@ static int polyp_write_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key,
err = 1;
finish:
- pthread_mutex_unlock(&ctl->mutex);
+ pa_threaded_mainloop_unlock(ctl->p->mainloop);
return err;
}
@@ -427,11 +420,11 @@ static void polyp_subscribe_events(snd_ctl_ext_t *ext, int subscribe)
assert(ctl);
- pthread_mutex_lock(&ctl->mutex);
+ pa_threaded_mainloop_lock(ctl->p->mainloop);
ctl->subscribed = !!(subscribe & SND_CTL_EVENT_MASK_VALUE);
- pthread_mutex_unlock(&ctl->mutex);
+ pa_threaded_mainloop_unlock(ctl->p->mainloop);
}
static int polyp_read_event(snd_ctl_ext_t *ext, snd_ctl_elem_id_t *id,
@@ -443,7 +436,7 @@ static int polyp_read_event(snd_ctl_ext_t *ext, snd_ctl_elem_id_t *id,
assert(ctl);
- pthread_mutex_lock(&ctl->mutex);
+ pa_threaded_mainloop_lock(ctl->p->mainloop);
if (!ctl->updated || !ctl->subscribed)
goto finish;
@@ -469,10 +462,13 @@ static int polyp_read_event(snd_ctl_ext_t *ext, snd_ctl_elem_id_t *id,
*event_mask = SND_CTL_EVENT_MASK_VALUE;
- err = 0;
+ if (!ctl->updated)
+ polyp_poll_deactivate(ctl->p);
+
+ err = 1;
finish:
- pthread_mutex_unlock(&ctl->mutex);
+ pa_threaded_mainloop_unlock(ctl->p->mainloop);
return err;
}
@@ -483,14 +479,13 @@ static int polyp_ctl_poll_descriptors_count(snd_ctl_ext_t *ext)
int count;
assert(ctl);
-
- pthread_mutex_lock(&ctl->mutex);
-
assert(ctl->p);
+ pa_threaded_mainloop_lock(ctl->p->mainloop);
+
count = polyp_poll_descriptors_count(ctl->p);
- pthread_mutex_unlock(&ctl->mutex);
+ pa_threaded_mainloop_unlock(ctl->p->mainloop);
return count;
}
@@ -502,20 +497,16 @@ static int polyp_ctl_poll_descriptors(snd_ctl_ext_t *ext, struct pollfd *pfd, un
snd_ctl_polyp_t *ctl = ext->private_data;
assert(ctl);
-
- pthread_mutex_lock(&ctl->mutex);
-
assert(ctl->p);
+ pa_threaded_mainloop_lock(ctl->p->mainloop);
+
num = polyp_poll_descriptors(ctl->p, pfd, space);
if (num < 0)
goto finish;
- if (ctl->updated)
- pa_mainloop_wakeup(ctl->p->mainloop);
-
finish:
- pthread_mutex_unlock(&ctl->mutex);
+ pa_threaded_mainloop_unlock(ctl->p->mainloop);
return num;
}
@@ -526,11 +517,10 @@ static int polyp_ctl_poll_revents(snd_ctl_ext_t *ext, struct pollfd *pfd, unsign
int err = 0;
assert(ctl);
-
- pthread_mutex_lock(&ctl->mutex);
-
assert(ctl->p);
+ pa_threaded_mainloop_lock(ctl->p->mainloop);
+
err = polyp_poll_revents(ctl->p, pfd, nfds, revents);
if (err < 0)
goto finish;
@@ -541,7 +531,7 @@ static int polyp_ctl_poll_revents(snd_ctl_ext_t *ext, struct pollfd *pfd, unsign
*revents |= POLLIN;
finish:
- pthread_mutex_unlock(&ctl->mutex);
+ pa_threaded_mainloop_unlock(ctl->p->mainloop);
return err;
}
@@ -560,8 +550,6 @@ static void polyp_close(snd_ctl_ext_t *ext)
if (ctl->sink)
free(ctl->sink);
- pthread_mutex_destroy(&ctl->mutex);
-
free(ctl);
}
@@ -591,6 +579,8 @@ static void server_info_cb(pa_context *c, const pa_server_info*i, void *userdata
ctl->source = strdup(i->default_source_name);
if (i->default_sink_name && !ctl->sink)
ctl->sink = strdup(i->default_sink_name);
+
+ pa_threaded_mainloop_signal(ctl->p->mainloop, 0);
}
SND_CTL_PLUGIN_DEFINE_FUNC(polyp)
@@ -603,7 +593,6 @@ SND_CTL_PLUGIN_DEFINE_FUNC(polyp)
int err;
snd_ctl_polyp_t *ctl;
pa_operation *o;
- pthread_mutexattr_t mutexattr;
snd_config_for_each(i, next, conf) {
snd_config_t *n = snd_config_iterator_entry(i);
@@ -647,21 +636,15 @@ SND_CTL_PLUGIN_DEFINE_FUNC(polyp)
ctl = calloc(1, sizeof(*ctl));
ctl->p = polyp_new();
- assert(ctl->p);
-
- err = polyp_connect(ctl->p, server);
- if (err < 0)
+ if (!ctl->p) {
+ err = -EIO;
goto error;
+ }
- err = polyp_start_thread(ctl->p);
+ err = polyp_connect(ctl->p, server);
if (err < 0)
goto error;
- pthread_mutexattr_init(&mutexattr);
- pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE);
- pthread_mutex_init(&ctl->mutex, &mutexattr);
- pthread_mutexattr_destroy(&mutexattr);
-
if (source)
ctl->source = strdup(source);
else if (device)
@@ -673,19 +656,33 @@ SND_CTL_PLUGIN_DEFINE_FUNC(polyp)
ctl->sink = strdup(device);
if (!ctl->source || !ctl->sink) {
+ pa_threaded_mainloop_lock(ctl->p->mainloop);
+
o = pa_context_get_server_info(ctl->p->context, server_info_cb, ctl);
err = polyp_wait_operation(ctl->p, o);
+
pa_operation_unref(o);
+
+ pa_threaded_mainloop_unlock(ctl->p->mainloop);
+
if (err < 0)
goto error;
}
+ pa_threaded_mainloop_lock(ctl->p->mainloop);
+
pa_context_set_subscribe_callback(ctl->p->context, event_cb, ctl);
o = pa_context_subscribe(ctl->p->context,
- PA_SUBSCRIPTION_MASK_SINK | PA_SUBSCRIPTION_MASK_SOURCE, NULL, NULL);
+ PA_SUBSCRIPTION_MASK_SINK | PA_SUBSCRIPTION_MASK_SOURCE,
+ polyp_context_success_cb, ctl->p);
+
err = polyp_wait_operation(ctl->p, o);
+
pa_operation_unref(o);
+
+ pa_threaded_mainloop_unlock(ctl->p->mainloop);
+
if (err < 0)
goto error;
diff --git a/polyp/pcm_polyp.c b/polyp/pcm_polyp.c
index 032ccbb..44ae35e 100644
--- a/polyp/pcm_polyp.c
+++ b/polyp/pcm_polyp.c
@@ -21,8 +21,6 @@
#include <stdio.h>
#include <sys/poll.h>
-#include <pthread.h>
-
#include <alsa/asoundlib.h>
#include <alsa/pcm_external.h>
@@ -46,8 +44,6 @@ typedef struct snd_pcm_polyp {
pa_sample_spec ss;
unsigned int frame_size;
pa_buffer_attr buffer_attr;
-
- pthread_mutex_t mutex;
} snd_pcm_polyp_t;
static void update_ptr(snd_pcm_polyp_t *pcm)
@@ -74,20 +70,17 @@ static int polyp_start(snd_pcm_ioplug_t *io)
int err = 0;
assert(pcm);
+ assert(pcm->p);
- pthread_mutex_lock(&pcm->mutex);
-
- assert(pcm->p && pcm->stream);
+ pa_threaded_mainloop_lock(pcm->p->mainloop);
- err = polyp_finish_poll(pcm->p);
- if (err < 0)
- goto finish;
+ assert(pcm->stream);
err = polyp_check_connection(pcm->p);
if (err < 0)
goto finish;
- o = pa_stream_cork(pcm->stream, 0, NULL, NULL);
+ o = pa_stream_cork(pcm->stream, 0, polyp_stream_success_cb, pcm->p);
assert(o);
err = polyp_wait_operation(pcm->p, o);
@@ -100,7 +93,7 @@ static int polyp_start(snd_pcm_ioplug_t *io)
}
finish:
- pthread_mutex_unlock(&pcm->mutex);
+ pa_threaded_mainloop_unlock(pcm->p->mainloop);
return err;
}
@@ -112,20 +105,17 @@ static int polyp_stop(snd_pcm_ioplug_t *io)
int err = 0;
assert(pcm);
+ assert(pcm->p);
- pthread_mutex_lock(&pcm->mutex);
-
- assert(pcm->p && pcm->stream);
+ pa_threaded_mainloop_lock(pcm->p->mainloop);
- err = polyp_finish_poll(pcm->p);
- if (err < 0)
- goto finish;
+ assert(pcm->stream);
err = polyp_check_connection(pcm->p);
if (err < 0)
goto finish;
- o = pa_stream_flush(pcm->stream, NULL, NULL);
+ o = pa_stream_flush(pcm->stream, polyp_stream_success_cb, pcm->p);
assert(o);
err = polyp_wait_operation(pcm->p, o);
@@ -137,7 +127,7 @@ static int polyp_stop(snd_pcm_ioplug_t *io)
goto finish;
}
- o = pa_stream_cork(pcm->stream, 1, NULL, NULL);
+ o = pa_stream_cork(pcm->stream, 1, polyp_stream_success_cb, pcm->p);
assert(o);
err = polyp_wait_operation(pcm->p, o);
@@ -150,7 +140,7 @@ static int polyp_stop(snd_pcm_ioplug_t *io)
}
finish:
- pthread_mutex_unlock(&pcm->mutex);
+ pa_threaded_mainloop_unlock(pcm->p->mainloop);
return err;
}
@@ -162,20 +152,17 @@ int polyp_drain(snd_pcm_ioplug_t *io)
int err = 0;
assert(pcm);
+ assert(pcm->p);
- pthread_mutex_lock(&pcm->mutex);
-
- assert(pcm->p && pcm->stream);
+ pa_threaded_mainloop_lock(pcm->p->mainloop);
- err = polyp_finish_poll(pcm->p);
- if (err < 0)
- goto finish;
+ assert(pcm->stream);
err = polyp_check_connection(pcm->p);
if (err < 0)
goto finish;
- o = pa_stream_drain(pcm->stream, NULL, NULL);
+ o = pa_stream_drain(pcm->stream, polyp_stream_success_cb, pcm->p);
assert(o);
err = polyp_wait_operation(pcm->p, o);
@@ -188,7 +175,7 @@ int polyp_drain(snd_pcm_ioplug_t *io)
}
finish:
- pthread_mutex_unlock(&pcm->mutex);
+ pa_threaded_mainloop_unlock(pcm->p->mainloop);
return err;
}
@@ -199,14 +186,11 @@ static snd_pcm_sframes_t polyp_pointer(snd_pcm_ioplug_t *io)
int err = 0;
assert(pcm);
+ assert(pcm->p);
- pthread_mutex_lock(&pcm->mutex);
-
- assert(pcm->p && pcm->stream);
+ pa_threaded_mainloop_lock(pcm->p->mainloop);
- err = polyp_finish_poll(pcm->p);
- if (err < 0)
- goto finish;
+ assert(pcm->stream);
err = polyp_check_connection(pcm->p);
if (err < 0)
@@ -214,14 +198,41 @@ static snd_pcm_sframes_t polyp_pointer(snd_pcm_ioplug_t *io)
update_ptr(pcm);
- err = polyp_start_poll(pcm->p);
+ err = snd_pcm_bytes_to_frames(io->pcm, pcm->ptr);
+
+finish:
+ pa_threaded_mainloop_unlock(pcm->p->mainloop);
+
+ return err;
+}
+
+static int polyp_delay(snd_pcm_ioplug_t *io,
+ snd_pcm_sframes_t *delayp)
+{
+ snd_pcm_polyp_t *pcm = io->private_data;
+ int err = 0;
+ pa_usec_t lat;
+
+ assert(pcm);
+ assert(pcm->p);
+
+ pa_threaded_mainloop_lock(pcm->p->mainloop);
+
+ assert(pcm->stream);
+
+ err = polyp_check_connection(pcm->p);
if (err < 0)
goto finish;
- err = snd_pcm_bytes_to_frames(io->pcm, pcm->ptr);
+ if (pa_stream_get_latency(pcm->stream, &lat, NULL)) {
+ err = -EIO;
+ goto finish;
+ }
+
+ *delayp = snd_pcm_bytes_to_frames(io->pcm, pa_usec_to_bytes(lat, &pcm->ss));
finish:
- pthread_mutex_unlock(&pcm->mutex);
+ pa_threaded_mainloop_unlock(pcm->p->mainloop);
return err;
}
@@ -236,14 +247,11 @@ static snd_pcm_sframes_t polyp_write(snd_pcm_ioplug_t *io,
int err = 0;
assert(pcm);
+ assert(pcm->p);
- pthread_mutex_lock(&pcm->mutex);
-
- assert(pcm->p && pcm->stream);
+ pa_threaded_mainloop_lock(pcm->p->mainloop);
- err = polyp_finish_poll(pcm->p);
- if (err < 0)
- goto finish;
+ assert(pcm->stream);
err = polyp_check_connection(pcm->p);
if (err < 0)
@@ -261,10 +269,13 @@ static snd_pcm_sframes_t polyp_write(snd_pcm_ioplug_t *io,
/* Make sure the buffer pointer is in sync */
update_ptr(pcm);
+ if (pcm->last_size < pcm->buffer_attr.minreq)
+ polyp_poll_deactivate(pcm->p);
+
err = size;
finish:
- pthread_mutex_unlock(&pcm->mutex);
+ pa_threaded_mainloop_unlock(pcm->p->mainloop);
return err;
}
@@ -280,14 +291,11 @@ static snd_pcm_sframes_t polyp_read(snd_pcm_ioplug_t *io,
int err = 0;
assert(pcm);
+ assert(pcm->p);
- pthread_mutex_lock(&pcm->mutex);
-
- assert(pcm->p && pcm->stream);
+ pa_threaded_mainloop_lock(pcm->p->mainloop);
- err = polyp_finish_poll(pcm->p);
- if (err < 0)
- goto finish;
+ assert(pcm->stream);
err = polyp_check_connection(pcm->p);
if (err < 0)
@@ -325,28 +333,40 @@ static snd_pcm_sframes_t polyp_read(snd_pcm_ioplug_t *io,
/* Make sure the buffer pointer is in sync */
update_ptr(pcm);
+ if (pcm->last_size < pcm->buffer_attr.minreq)
+ polyp_poll_deactivate(pcm->p);
+
err = size - (remain_size / pcm->frame_size);
finish:
- pthread_mutex_unlock(&pcm->mutex);
+ pa_threaded_mainloop_unlock(pcm->p->mainloop);
return err;
}
+static void stream_request_cb(pa_stream *p, size_t length, void *userdata)
+{
+ snd_pcm_polyp_t *pcm = userdata;
+
+ assert(pcm);
+ assert(pcm->p);
+
+ polyp_poll_activate(pcm->p);
+}
+
static int polyp_pcm_poll_descriptors_count(snd_pcm_ioplug_t *io)
{
snd_pcm_polyp_t *pcm = io->private_data;
int count;
assert(pcm);
-
- pthread_mutex_lock(&pcm->mutex);
-
assert(pcm->p);
+ pa_threaded_mainloop_lock(pcm->p->mainloop);
+
count = polyp_poll_descriptors_count(pcm->p);
- pthread_mutex_unlock(&pcm->mutex);
+ pa_threaded_mainloop_unlock(pcm->p->mainloop);
return count;
}
@@ -357,14 +377,13 @@ static int polyp_pcm_poll_descriptors(snd_pcm_ioplug_t *io, struct pollfd *pfd,
int err;
assert(pcm);
-
- pthread_mutex_lock(&pcm->mutex);
-
assert(pcm->p);
+ pa_threaded_mainloop_lock(pcm->p->mainloop);
+
err = polyp_poll_descriptors(pcm->p, pfd, space);
- pthread_mutex_unlock(&pcm->mutex);
+ pa_threaded_mainloop_unlock(pcm->p->mainloop);
return err;
}
@@ -375,11 +394,10 @@ static int polyp_pcm_poll_revents(snd_pcm_ioplug_t *io, struct pollfd *pfd, unsi
int err = 0;
assert(pcm);
-
- pthread_mutex_lock(&pcm->mutex);
-
assert(pcm->p);
+ pa_threaded_mainloop_lock(pcm->p->mainloop);
+
err = polyp_poll_revents(pcm->p, pfd, nfds, revents);
if (err < 0)
goto finish;
@@ -402,25 +420,21 @@ static int polyp_pcm_poll_revents(snd_pcm_ioplug_t *io, struct pollfd *pfd, unsi
}
finish:
- pthread_mutex_unlock(&pcm->mutex);
+ pa_threaded_mainloop_unlock(pcm->p->mainloop);
return err;
}
static int polyp_prepare(snd_pcm_ioplug_t *io)
{
+ pa_channel_map map;
snd_pcm_polyp_t *pcm = io->private_data;
int err = 0;
assert(pcm);
-
- pthread_mutex_lock(&pcm->mutex);
-
assert(pcm->p);
- err = polyp_finish_poll(pcm->p);
- if (err < 0)
- goto finish;
+ pa_threaded_mainloop_lock(pcm->p->mainloop);
if (pcm->stream) {
pa_stream_disconnect(pcm->stream);
@@ -436,15 +450,24 @@ static int polyp_prepare(snd_pcm_ioplug_t *io)
assert(pcm->stream == NULL);
if (io->stream == SND_PCM_STREAM_PLAYBACK)
- pcm->stream = pa_stream_new(pcm->p->context, "ALSA Playback", &pcm->ss, NULL);
+ pcm->stream = pa_stream_new(pcm->p->context, "ALSA Playback", &pcm->ss,
+ pa_channel_map_init_auto(&map, pcm->ss.channels, PA_CHANNEL_MAP_ALSA));
else
- pcm->stream = pa_stream_new(pcm->p->context, "ALSA Capture", &pcm->ss, NULL);
+ pcm->stream = pa_stream_new(pcm->p->context, "ALSA Capture", &pcm->ss,
+ pa_channel_map_init_auto(&map, pcm->ss.channels, PA_CHANNEL_MAP_ALSA));
assert(pcm->stream);
- if (io->stream == SND_PCM_STREAM_PLAYBACK)
- pa_stream_connect_playback(pcm->stream, pcm->device, &pcm->buffer_attr, 0, NULL, NULL);
- else
- pa_stream_connect_record(pcm->stream, pcm->device, &pcm->buffer_attr, 0);
+ pa_stream_set_state_callback(pcm->stream, polyp_stream_state_cb, pcm->p);
+
+ if (io->stream == SND_PCM_STREAM_PLAYBACK) {
+ pa_stream_set_write_callback(pcm->stream, stream_request_cb, pcm);
+ pa_stream_connect_playback(pcm->stream, pcm->device, &pcm->buffer_attr,
+ PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING, NULL, NULL);
+ } else {
+ pa_stream_set_read_callback(pcm->stream, stream_request_cb, pcm);
+ pa_stream_connect_record(pcm->stream, pcm->device, &pcm->buffer_attr,
+ PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING);
+ }
err = polyp_wait_stream_state(pcm->p, pcm->stream, PA_STREAM_READY);
if (err < 0) {
@@ -459,7 +482,7 @@ static int polyp_prepare(snd_pcm_ioplug_t *io)
pcm->offset = 0;
finish:
- pthread_mutex_unlock(&pcm->mutex);
+ pa_threaded_mainloop_unlock(pcm->p->mainloop);
return err;
}
@@ -470,10 +493,11 @@ static int polyp_hw_params(snd_pcm_ioplug_t *io, snd_pcm_hw_params_t *params)
int err = 0;
assert(pcm);
+ assert(pcm->p);
- pthread_mutex_lock(&pcm->mutex);
+ pa_threaded_mainloop_lock(pcm->p->mainloop);
- assert(pcm->p && !pcm->stream);
+ assert(!pcm->stream);
pcm->frame_size = (snd_pcm_format_physical_width(io->format) * io->channels) / 8;
@@ -516,7 +540,7 @@ static int polyp_hw_params(snd_pcm_ioplug_t *io, snd_pcm_hw_params_t *params)
pcm->buffer_attr.fragsize = io->period_size * pcm->frame_size;
finish:
- pthread_mutex_unlock(&pcm->mutex);
+ pa_threaded_mainloop_unlock(pcm->p->mainloop);
return err;
}
@@ -527,51 +551,55 @@ static int polyp_close(snd_pcm_ioplug_t *io)
assert(pcm);
+ pa_threaded_mainloop_lock(pcm->p->mainloop);
+
if (pcm->stream) {
pa_stream_disconnect(pcm->stream);
polyp_wait_stream_state(pcm->p, pcm->stream, PA_STREAM_TERMINATED);
pa_stream_unref(pcm->stream);
}
+ pa_threaded_mainloop_unlock(pcm->p->mainloop);
+
if (pcm->p)
polyp_free(pcm->p);
if (pcm->device)
free(pcm->device);
- pthread_mutex_destroy(&pcm->mutex);
-
free(pcm);
return 0;
}
static snd_pcm_ioplug_callback_t polyp_playback_callback = {
- .start = polyp_start,
- .stop = polyp_stop,
+ .start = polyp_start,
+ .stop = polyp_stop,
.drain = polyp_drain,
- .pointer = polyp_pointer,
- .transfer = polyp_write,
+ .pointer = polyp_pointer,
+ .transfer = polyp_write,
+ .delay = polyp_delay,
.poll_descriptors_count = polyp_pcm_poll_descriptors_count,
.poll_descriptors = polyp_pcm_poll_descriptors,
.poll_revents = polyp_pcm_poll_revents,
- .prepare = polyp_prepare,
- .hw_params = polyp_hw_params,
- .close = polyp_close,
+ .prepare = polyp_prepare,
+ .hw_params = polyp_hw_params,
+ .close = polyp_close,
};
static snd_pcm_ioplug_callback_t polyp_capture_callback = {
- .start = polyp_start,
- .stop = polyp_stop,
- .pointer = polyp_pointer,
- .transfer = polyp_read,
+ .start = polyp_start,
+ .stop = polyp_stop,
+ .pointer = polyp_pointer,
+ .transfer = polyp_read,
+ .delay = polyp_delay,
.poll_descriptors_count = polyp_pcm_poll_descriptors_count,
.poll_descriptors = polyp_pcm_poll_descriptors,
.poll_revents = polyp_pcm_poll_revents,
- .prepare = polyp_prepare,
- .hw_params = polyp_hw_params,
- .close = polyp_close,
+ .prepare = polyp_prepare,
+ .hw_params = polyp_hw_params,
+ .close = polyp_close,
};
@@ -645,7 +673,6 @@ SND_PCM_PLUGIN_DEFINE_FUNC(polyp)
const char *device = NULL;
int err;
snd_pcm_polyp_t *pcm;
- pthread_mutexattr_t mutexattr;
snd_config_for_each(i, next, conf) {
snd_config_t *n = snd_config_iterator_entry(i);
@@ -678,21 +705,15 @@ SND_PCM_PLUGIN_DEFINE_FUNC(polyp)
pcm->device = strdup(device);
pcm->p = polyp_new();
- assert(pcm->p);
-
- err = polyp_connect(pcm->p, server);
- if (err < 0)
+ if (!pcm->p) {
+ err = -EIO;
goto error;
+ }
- err = polyp_start_thread(pcm->p);
+ err = polyp_connect(pcm->p, server);
if (err < 0)
goto error;
- pthread_mutexattr_init(&mutexattr);
- pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE);
- pthread_mutex_init(&pcm->mutex, &mutexattr);
- pthread_mutexattr_destroy(&mutexattr);
-
pcm->io.version = SND_PCM_IOPLUG_VERSION;
pcm->io.name = "ALSA <-> Polypaudio PCM I/O Plugin";
pcm->io.poll_fd = -1;
diff --git a/polyp/polyp.c b/polyp/polyp.c
index f10221c..f5e5cac 100644
--- a/polyp/polyp.c
+++ b/polyp/polyp.c
@@ -19,177 +19,71 @@
*/
#include <stdio.h>
+#include <unistd.h>
#include <signal.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-
-#include <pthread.h>
+#include <sys/poll.h>
#include "polyp.h"
-enum {
- COMMAND_POLL = 'p',
- COMMAND_QUIT = 'q',
- COMMAND_POLL_DONE = 'P',
- COMMAND_POLL_FAILED = 'F',
-};
-
-static int write_command(snd_polyp_t *p, char command)
-{
- if (write(p->main_fd, &command, 1) != 1)
- return -errno;
- return 0;
-}
-
-static int write_reply(snd_polyp_t *p, char reply)
-{
- if (write(p->thread_fd, &reply, 1) != 1)
- return -errno;
- return 0;
-}
-
-static int read_command(snd_polyp_t *p)
-{
- char command;
-
- if (read(p->thread_fd, &command, 1) != 1)
- return -errno;
-
- return command;
-}
-
-static int read_reply(snd_polyp_t *p)
-{
- char reply;
-
- if (read(p->main_fd, &reply, 1) != 1)
- return -errno;
-
- return reply;
-}
-
-static void* thread_func(void *data)
+int polyp_check_connection(snd_polyp_t *p)
{
- snd_polyp_t *p = (snd_polyp_t*)data;
- sigset_t mask;
- char command;
- int ret;
-
- sigfillset(&mask);
- pthread_sigmask(SIG_BLOCK, &mask, NULL);
-
- do {
- command = read_command(p);
- if (command < 0)
- break;
+ pa_context_state_t state;
- switch (command) {
- case COMMAND_POLL:
- do {
- ret = pa_mainloop_poll(p->mainloop);
- } while ((ret < 0) && (errno == EINTR));
+ assert(p && p->context && p->mainloop);
- ret = write_reply(p, (ret < 0) ? COMMAND_POLL_FAILED : COMMAND_POLL_DONE);
- if (ret < 0)
- return NULL;
+ state = pa_context_get_state(p->context);
- break;
- }
- } while (command != COMMAND_QUIT);
+ if (state != PA_CONTEXT_READY)
+ return -EIO;
- return NULL;
+ return 0;
}
-int polyp_start_poll(snd_polyp_t *p)
+void polyp_stream_state_cb(pa_stream *s, void * userdata)
{
- int err;
+ snd_polyp_t *p = userdata;
+ assert(s);
assert(p);
- if (p->state == POLYP_STATE_POLLING)
- return 0;
-
- assert(p->state == POLYP_STATE_READY);
-
- err = pa_mainloop_prepare(p->mainloop, -1);
- if (err < 0)
- return err;
-
- err = write_command(p, COMMAND_POLL);
- if (err < 0)
- return err;
-
- p->state = POLYP_STATE_POLLING;
-
- return 0;
+ pa_threaded_mainloop_signal(p->mainloop, 0);
}
-int polyp_finish_poll(snd_polyp_t *p)
+void polyp_stream_success_cb(pa_stream *s, int success, void *userdata)
{
- char reply;
- int err;
+ snd_polyp_t *p = userdata;
+ assert(s);
assert(p);
- if (p->state == POLYP_STATE_READY)
- return 0;
-
- assert(p->state == POLYP_STATE_POLLING);
-
- p->state = POLYP_STATE_READY;
-
- pa_mainloop_wakeup(p->mainloop);
-
- reply = read_reply(p);
-
- if (reply == COMMAND_POLL_DONE) {
- err = pa_mainloop_dispatch(p->mainloop);
- if (err < 0)
- return err;
- } else
- return -EIO;
-
- return 0;
+ pa_threaded_mainloop_signal(p->mainloop, 0);
}
-int polyp_check_connection(snd_polyp_t *p)
+void polyp_context_success_cb(pa_context *c, int success, void *userdata)
{
- pa_context_state_t state;
-
- assert(p && p->context);
+ snd_polyp_t *p = userdata;
- state = pa_context_get_state(p->context);
-
- if (state != PA_CONTEXT_READY)
- return -EIO;
+ assert(c);
+ assert(p);
- return 0;
+ pa_threaded_mainloop_signal(p->mainloop, 0);
}
int polyp_wait_operation(snd_polyp_t *p, pa_operation *o)
{
- int err;
+ assert(p && o && (p->state == POLYP_STATE_READY) && p->mainloop);
- assert(p && o && (p->state == POLYP_STATE_READY));
-
- while (pa_operation_get_state(o) == PA_OPERATION_RUNNING) {
- p->state = POLYP_STATE_POLLING;
- err = pa_mainloop_iterate(p->mainloop, 1, NULL);
- p->state = POLYP_STATE_READY;
- if (err < 0)
- return err;
- }
+ while (pa_operation_get_state(o) == PA_OPERATION_RUNNING)
+ pa_threaded_mainloop_wait(p->mainloop);
return 0;
}
int polyp_wait_stream_state(snd_polyp_t *p, pa_stream *stream, pa_stream_state_t target)
{
- int err;
pa_stream_state_t state;
- assert(p && stream && (p->state == POLYP_STATE_READY));
+ assert(p && stream && (p->state == POLYP_STATE_READY) && p->mainloop);
while (1) {
state = pa_stream_get_state(stream);
@@ -200,34 +94,72 @@ int polyp_wait_stream_state(snd_polyp_t *p, pa_stream *stream, pa_stream_state_t
if (state == target)
break;
- p->state = POLYP_STATE_POLLING;
- err = pa_mainloop_iterate(p->mainloop, 1, NULL);
- p->state = POLYP_STATE_READY;
- if (err < 0)
- return -EIO;
+ pa_threaded_mainloop_wait(p->mainloop);
}
return 0;
}
+static void context_state_cb(pa_context *c, void *userdata) {
+ snd_polyp_t *p = userdata;
+ assert(c);
+
+ switch (pa_context_get_state(c)) {
+ case PA_CONTEXT_READY:
+ case PA_CONTEXT_TERMINATED:
+ case PA_CONTEXT_FAILED:
+ pa_threaded_mainloop_signal(p->mainloop, 0);
+ break;
+
+ case PA_CONTEXT_UNCONNECTED:
+ case PA_CONTEXT_CONNECTING:
+ case PA_CONTEXT_AUTHORIZING:
+ case PA_CONTEXT_SETTING_NAME:
+ break;
+ }
+}
+
snd_polyp_t *polyp_new()
{
snd_polyp_t *p;
+ int fd[2] = { -1, -1 };
+ char proc[PATH_MAX], buf[PATH_MAX + 20];
p = calloc(1, sizeof(snd_polyp_t));
assert(p);
p->state = POLYP_STATE_INIT;
- p->main_fd = -1;
- p->thread_fd = -1;
- p->thread_running = 0;
+ if (pipe(fd)) {
+ free(p);
+ return NULL;
+ }
+
+ p->main_fd = fd[0];
+ p->thread_fd = fd[1];
+
+ fcntl(fd[0], F_SETFL, O_NONBLOCK);
+ fcntl(fd[1], F_SETFL, O_NONBLOCK);
- p->mainloop = pa_mainloop_new();
+ signal(SIGPIPE, SIG_IGN); /* Yes, ugly as hell */
+
+ p->mainloop = pa_threaded_mainloop_new();
assert(p->mainloop);
- p->context = pa_context_new(pa_mainloop_get_api(p->mainloop),
- "ALSA Plugin");
+ if (pa_threaded_mainloop_start(p->mainloop) < 0) {
+ pa_threaded_mainloop_free(p->mainloop);
+ close(fd[0]);
+ close(fd[1]);
+ free(p);
+ return NULL;
+ }
+
+ if (pa_get_binary_name(proc, sizeof(proc)))
+ snprintf(buf, sizeof(buf), "ALSA plug-in [%s]", pa_path_get_filename(proc));
+ else
+ snprintf(buf, sizeof(buf), "ALSA plug-in");
+
+ p->context = pa_context_new(pa_threaded_mainloop_get_api(p->mainloop), buf);
assert(p->context);
return p;
@@ -235,22 +167,13 @@ snd_polyp_t *polyp_new()
void polyp_free(snd_polyp_t *p)
{
- if (p->thread_running) {
- assert(p->mainloop && p->thread);
- write_command(p, COMMAND_QUIT);
- pa_mainloop_wakeup(p->mainloop);
- pthread_join(p->thread, NULL);
- }
+ pa_threaded_mainloop_stop(p->mainloop);
- if (p->context)
- pa_context_unref(p->context);
- if (p->mainloop)
- pa_mainloop_free(p->mainloop);
+ pa_context_unref(p->context);
+ pa_threaded_mainloop_free(p->mainloop);
- if (p->thread_fd >= 0)
- close(p->thread_fd);
- if (p->main_fd >= 0)
- close(p->main_fd);
+ close(p->thread_fd);
+ close(p->main_fd);
free(p);
}
@@ -258,70 +181,52 @@ void polyp_free(snd_polyp_t *p)
int polyp_connect(snd_polyp_t *p, const char *server)
{
int err;
- pa_context_state_t state;
assert(p && p->context && p->mainloop && (p->state == POLYP_STATE_INIT));
+ pa_threaded_mainloop_lock(p->mainloop);
+
err = pa_context_connect(p->context, server, 0, NULL);
if (err < 0)
goto error;
- while (1) {
- state = pa_context_get_state(p->context);
+ pa_context_set_state_callback(p->context, context_state_cb, p);
- if (state == PA_CONTEXT_FAILED)
- goto error;
+ pa_threaded_mainloop_wait(p->mainloop);
- if (state == PA_CONTEXT_READY)
- break;
+ if (pa_context_get_state(p->context) != PA_CONTEXT_READY)
+ goto error;
- err = pa_mainloop_iterate(p->mainloop, 1, NULL);
- if (err < 0)
- return -EIO;
- }
+ pa_threaded_mainloop_unlock(p->mainloop);
- p->state = POLYP_STATE_CONNECTED;
+ p->state = POLYP_STATE_READY;
return 0;
error:
fprintf(stderr, "*** POLYPAUDIO: Unable to connect: %s\n",
pa_strerror(pa_context_errno(p->context)));
+
+ pa_threaded_mainloop_unlock(p->mainloop);
+
return -ECONNREFUSED;
}
-int polyp_start_thread(snd_polyp_t *p)
+void polyp_poll_activate(snd_polyp_t *p)
{
- int err;
- int fd[2] = { -1, -1 };
-
- assert(p && (p->state == POLYP_STATE_CONNECTED));
-
- if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd) < 0) {
- perror("socketpair()");
- return -errno;
- }
-
- p->thread_fd = fd[0];
- p->main_fd = fd[1];
-
- p->thread_running = 0;
+ assert(p);
- err = pthread_create(&p->thread, NULL, thread_func, p);
- if (err) {
- SNDERR("pthread_create(): %s", strerror(err));
- close(fd[0]);
- close(fd[1]);
- p->main_fd = -1;
- p->thread_fd = -1;
- return -err;
- }
+ write(p->thread_fd, "a", 1);
+}
- p->thread_running = 1;
+void polyp_poll_deactivate(snd_polyp_t *p)
+{
+ char buf[10];
- p->state = POLYP_STATE_READY;
+ assert(p);
- return 0;
+ /* Drain the pipe */
+ while (read(p->main_fd, buf, sizeof(buf)) > 0);
}
int polyp_poll_descriptors_count(snd_polyp_t *p)
@@ -336,22 +241,12 @@ int polyp_poll_descriptors_count(snd_polyp_t *p)
int polyp_poll_descriptors(snd_polyp_t *p, struct pollfd *pfd, unsigned int space)
{
- int err;
-
assert(p);
- err = polyp_finish_poll(p);
- if (err < 0)
- return err;
-
- err = polyp_start_poll(p);
- if (err < 0)
- return err;
-
assert(space >= 1);
pfd[0].fd = p->main_fd;
- pfd[0].events = POLL_IN;
+ pfd[0].events = POLLIN;
pfd[0].revents = 0;
return 1;
@@ -359,20 +254,7 @@ int polyp_poll_descriptors(snd_polyp_t *p, struct pollfd *pfd, unsigned int spac
int polyp_poll_revents(snd_polyp_t *p, struct pollfd *pfd, unsigned int nfds, unsigned short *revents)
{
- int err;
-
assert(p);
- err = polyp_finish_poll(p);
- if (err < 0)
- return err;
-
- err = polyp_check_connection(p);
- if (err < 0)
- return err;
-
- /*
- * The application might redo the poll immediatly.
- */
- return polyp_poll_descriptors(p, pfd, nfds);
+ return 1;
}
diff --git a/polyp/polyp.h b/polyp/polyp.h
index 8c2ab95..f210e18 100644
--- a/polyp/polyp.h
+++ b/polyp/polyp.h
@@ -21,32 +21,27 @@
#include <alsa/asoundlib.h>
#include <polyp/polypaudio.h>
-#include <polyp/mainloop.h>
#define ARRAY_SIZE(a) (sizeof(a)/sizeof((a)[0]))
typedef struct snd_polyp {
- pa_mainloop *mainloop;
+ pa_threaded_mainloop *mainloop;
pa_context *context;
int thread_fd, main_fd;
- pthread_t thread;
- int thread_running;
-
enum {
POLYP_STATE_INIT,
- POLYP_STATE_CONNECTED,
POLYP_STATE_READY,
- POLYP_STATE_POLLING,
} state;
} snd_polyp_t;
-int polyp_start_poll(snd_polyp_t *p);
-int polyp_finish_poll(snd_polyp_t *p);
-
int polyp_check_connection(snd_polyp_t *p);
+void polyp_stream_state_cb(pa_stream *s, void * userdata);
+void polyp_stream_success_cb(pa_stream *s, int success, void *userdata);
+void polyp_context_success_cb(pa_context *c, int success, void *userdata);
+
int polyp_wait_operation(snd_polyp_t *p, pa_operation *o);
int polyp_wait_stream_state(snd_polyp_t *p, pa_stream *stream, pa_stream_state_t target);
@@ -54,8 +49,9 @@ snd_polyp_t *polyp_new();
void polyp_free(snd_polyp_t *p);
int polyp_connect(snd_polyp_t *p, const char *server);
-int polyp_start_thread(snd_polyp_t *p);
+void polyp_poll_activate(snd_polyp_t *p);
+void polyp_poll_deactivate(snd_polyp_t *p);
int polyp_poll_descriptors_count(snd_polyp_t *p);
int polyp_poll_descriptors(snd_polyp_t *p, struct pollfd *pfd, unsigned int space);
int polyp_poll_revents(snd_polyp_t *p, struct pollfd *pfd, unsigned int nfds, unsigned short *revents);