From e945a42f2c28c271a56ce0546d38f0c33c4b8125 Mon Sep 17 00:00:00 2001 From: Pierre Ossman Date: Mon, 29 May 2006 12:19:46 +0200 Subject: Update Polypaudio plug-in to the 0.9.0 API The new version of Polypaudio includes a threading abstraction that allows application of a more synchronous nature to use the API more easily. Using this, the complexity of the Polypaudio plug-in is greatly reduced and also removes the risk of stalling the communications layer. Signed-off-by: Pierre Ossman --- configure.in | 2 +- polyp/ctl_polyp.c | 141 ++++++++++++------------ polyp/pcm_polyp.c | 235 +++++++++++++++++++++------------------ polyp/polyp.c | 324 +++++++++++++++++------------------------------------- polyp/polyp.h | 18 ++- 5 files changed, 308 insertions(+), 412 deletions(-) diff --git a/configure.in b/configure.in index 178d1e4..9c779e2 100644 --- a/configure.in +++ b/configure.in @@ -16,7 +16,7 @@ AC_CHECK_LIB(asound, snd_pcm_ioplug_create,, PKG_CHECK_MODULES(JACK, jack >= 0.98, [HAVE_JACK=yes], [HAVE_JACK=no]) AM_CONDITIONAL(HAVE_JACK, test x$HAVE_JACK = xyes) -PKG_CHECK_MODULES(polypaudio, [polyplib], [HAVE_POLYP=yes], [HAVE_POLYP=no]) +PKG_CHECK_MODULES(polypaudio, [polyplib >= 0.9.0], [HAVE_POLYP=yes], [HAVE_POLYP=no]) AM_CONDITIONAL(HAVE_POLYP, test x$HAVE_POLYP = xyes) PKG_CHECK_MODULES(samplerate, [samplerate], [HAVE_SAMPLERATE=yes], [HAVE_SAMPLERATE=no]) diff --git a/polyp/ctl_polyp.c b/polyp/ctl_polyp.c index d27e8d9..dea3fb8 100644 --- a/polyp/ctl_polyp.c +++ b/polyp/ctl_polyp.c @@ -20,8 +20,6 @@ #include -#include - #include #include @@ -43,8 +41,6 @@ typedef struct snd_ctl_polyp { int subscribed; int updated; - - pthread_mutex_t mutex; } snd_ctl_polyp_t; #define SOURCE_VOL_NAME "Capture Volume" @@ -62,14 +58,19 @@ static void sink_info_cb(pa_context *c, const pa_sink_info *i, int is_last, void snd_ctl_polyp_t *ctl = (snd_ctl_polyp_t*)userdata; int chan; - if (is_last) + assert(ctl); + + if (is_last) { + pa_threaded_mainloop_signal(ctl->p->mainloop, 0); return; + } - assert(ctl && i); + assert(i); if (!!ctl->sink_muted != !!i->mute) { ctl->sink_muted = i->mute; ctl->updated |= UPDATE_SINK_MUTE; + polyp_poll_activate(ctl->p); } if (ctl->sink_volume.channels == i->volume.channels) { @@ -81,6 +82,7 @@ static void sink_info_cb(pa_context *c, const pa_sink_info *i, int is_last, void return; ctl->updated |= UPDATE_SINK_VOL; + polyp_poll_activate(ctl->p); } memcpy(&ctl->sink_volume, &i->volume, sizeof(pa_cvolume)); @@ -91,14 +93,19 @@ static void source_info_cb(pa_context *c, const pa_source_info *i, int is_last, snd_ctl_polyp_t *ctl = (snd_ctl_polyp_t*)userdata; int chan; - if (is_last) + assert(ctl); + + if (is_last) { + pa_threaded_mainloop_signal(ctl->p->mainloop, 0); return; + } - assert(ctl && i); + assert(i); if (!!ctl->source_muted != !!i->mute) { ctl->source_muted = i->mute; ctl->updated |= UPDATE_SOURCE_MUTE; + polyp_poll_activate(ctl->p); } if (ctl->source_volume.channels == i->volume.channels) { @@ -110,6 +117,7 @@ static void source_info_cb(pa_context *c, const pa_source_info *i, int is_last, return; ctl->updated |= UPDATE_SOURCE_VOL; + polyp_poll_activate(ctl->p); } memcpy(&ctl->source_volume, &i->volume, sizeof(pa_cvolume)); @@ -163,14 +171,14 @@ static int polyp_elem_count(snd_ctl_ext_t *ext) assert(ctl); - pthread_mutex_lock(&ctl->mutex); + pa_threaded_mainloop_lock(ctl->p->mainloop); if (ctl->source) count += 2; if (ctl->sink) count += 2; - pthread_mutex_unlock(&ctl->mutex); + pa_threaded_mainloop_unlock(ctl->p->mainloop); return count; } @@ -184,7 +192,7 @@ static int polyp_elem_list(snd_ctl_ext_t *ext, unsigned int offset, snd_ctl_elem_id_set_interface(id, SND_CTL_ELEM_IFACE_MIXER); - pthread_mutex_lock(&ctl->mutex); + pa_threaded_mainloop_lock(ctl->p->mainloop); if (ctl->source) { if (offset == 0) @@ -194,7 +202,7 @@ static int polyp_elem_list(snd_ctl_ext_t *ext, unsigned int offset, } else offset += 2; - pthread_mutex_unlock(&ctl->mutex); + pa_threaded_mainloop_unlock(ctl->p->mainloop); if (offset == 2) snd_ctl_elem_id_set_name(id, SINK_VOL_NAME); @@ -229,18 +237,13 @@ static int polyp_get_attribute(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, snd_ctl_polyp_t *ctl = ext->private_data; int err = 0; - assert(ctl); - if (key > 3) return -EINVAL; - pthread_mutex_lock(&ctl->mutex); - + assert(ctl); assert(ctl->p); - err = polyp_finish_poll(ctl->p); - if (err < 0) - goto finish; + pa_threaded_mainloop_lock(ctl->p->mainloop); err = polyp_check_connection(ctl->p); if (err < 0) @@ -265,7 +268,7 @@ static int polyp_get_attribute(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, *count = 1; finish: - pthread_mutex_unlock(&ctl->mutex); + pa_threaded_mainloop_unlock(ctl->p->mainloop); return err; } @@ -288,14 +291,9 @@ static int polyp_read_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, pa_cvolume *vol = NULL; assert(ctl); - - pthread_mutex_lock(&ctl->mutex); - assert(ctl->p); - err = polyp_finish_poll(ctl->p); - if (err < 0) - goto finish; + pa_threaded_mainloop_lock(ctl->p->mainloop); err = polyp_check_connection(ctl->p); if (err < 0) @@ -329,7 +327,7 @@ static int polyp_read_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, } finish: - pthread_mutex_unlock(&ctl->mutex); + pa_threaded_mainloop_unlock(ctl->p->mainloop); return err; } @@ -343,14 +341,9 @@ static int polyp_write_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, pa_cvolume *vol = NULL; assert(ctl); - - pthread_mutex_lock(&ctl->mutex); - assert(ctl->p && ctl->p->context); - err = polyp_finish_poll(ctl->p); - if (err < 0) - goto finish; + pa_threaded_mainloop_lock(ctl->p->mainloop); err = polyp_check_connection(ctl->p); if (err < 0) @@ -395,17 +388,17 @@ static int polyp_write_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, if (key == 0) o = pa_context_set_source_volume_by_name(ctl->p->context, - ctl->source, vol, NULL, NULL); + ctl->source, vol, polyp_context_success_cb, ctl->p); else o = pa_context_set_sink_volume_by_name(ctl->p->context, - ctl->sink, vol, NULL, NULL); + ctl->sink, vol, polyp_context_success_cb, ctl->p); } else { if (key == 1) o = pa_context_set_source_mute_by_name(ctl->p->context, - ctl->source, ctl->source_muted, NULL, NULL); + ctl->source, ctl->source_muted, polyp_context_success_cb, ctl->p); else o = pa_context_set_sink_mute_by_name(ctl->p->context, - ctl->sink, ctl->sink_muted, NULL, NULL); + ctl->sink, ctl->sink_muted, polyp_context_success_cb, ctl->p); } err = polyp_wait_operation(ctl->p, o); @@ -416,7 +409,7 @@ static int polyp_write_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, err = 1; finish: - pthread_mutex_unlock(&ctl->mutex); + pa_threaded_mainloop_unlock(ctl->p->mainloop); return err; } @@ -427,11 +420,11 @@ static void polyp_subscribe_events(snd_ctl_ext_t *ext, int subscribe) assert(ctl); - pthread_mutex_lock(&ctl->mutex); + pa_threaded_mainloop_lock(ctl->p->mainloop); ctl->subscribed = !!(subscribe & SND_CTL_EVENT_MASK_VALUE); - pthread_mutex_unlock(&ctl->mutex); + pa_threaded_mainloop_unlock(ctl->p->mainloop); } static int polyp_read_event(snd_ctl_ext_t *ext, snd_ctl_elem_id_t *id, @@ -443,7 +436,7 @@ static int polyp_read_event(snd_ctl_ext_t *ext, snd_ctl_elem_id_t *id, assert(ctl); - pthread_mutex_lock(&ctl->mutex); + pa_threaded_mainloop_lock(ctl->p->mainloop); if (!ctl->updated || !ctl->subscribed) goto finish; @@ -469,10 +462,13 @@ static int polyp_read_event(snd_ctl_ext_t *ext, snd_ctl_elem_id_t *id, *event_mask = SND_CTL_EVENT_MASK_VALUE; - err = 0; + if (!ctl->updated) + polyp_poll_deactivate(ctl->p); + + err = 1; finish: - pthread_mutex_unlock(&ctl->mutex); + pa_threaded_mainloop_unlock(ctl->p->mainloop); return err; } @@ -483,14 +479,13 @@ static int polyp_ctl_poll_descriptors_count(snd_ctl_ext_t *ext) int count; assert(ctl); - - pthread_mutex_lock(&ctl->mutex); - assert(ctl->p); + pa_threaded_mainloop_lock(ctl->p->mainloop); + count = polyp_poll_descriptors_count(ctl->p); - pthread_mutex_unlock(&ctl->mutex); + pa_threaded_mainloop_unlock(ctl->p->mainloop); return count; } @@ -502,20 +497,16 @@ static int polyp_ctl_poll_descriptors(snd_ctl_ext_t *ext, struct pollfd *pfd, un snd_ctl_polyp_t *ctl = ext->private_data; assert(ctl); - - pthread_mutex_lock(&ctl->mutex); - assert(ctl->p); + pa_threaded_mainloop_lock(ctl->p->mainloop); + num = polyp_poll_descriptors(ctl->p, pfd, space); if (num < 0) goto finish; - if (ctl->updated) - pa_mainloop_wakeup(ctl->p->mainloop); - finish: - pthread_mutex_unlock(&ctl->mutex); + pa_threaded_mainloop_unlock(ctl->p->mainloop); return num; } @@ -526,11 +517,10 @@ static int polyp_ctl_poll_revents(snd_ctl_ext_t *ext, struct pollfd *pfd, unsign int err = 0; assert(ctl); - - pthread_mutex_lock(&ctl->mutex); - assert(ctl->p); + pa_threaded_mainloop_lock(ctl->p->mainloop); + err = polyp_poll_revents(ctl->p, pfd, nfds, revents); if (err < 0) goto finish; @@ -541,7 +531,7 @@ static int polyp_ctl_poll_revents(snd_ctl_ext_t *ext, struct pollfd *pfd, unsign *revents |= POLLIN; finish: - pthread_mutex_unlock(&ctl->mutex); + pa_threaded_mainloop_unlock(ctl->p->mainloop); return err; } @@ -560,8 +550,6 @@ static void polyp_close(snd_ctl_ext_t *ext) if (ctl->sink) free(ctl->sink); - pthread_mutex_destroy(&ctl->mutex); - free(ctl); } @@ -591,6 +579,8 @@ static void server_info_cb(pa_context *c, const pa_server_info*i, void *userdata ctl->source = strdup(i->default_source_name); if (i->default_sink_name && !ctl->sink) ctl->sink = strdup(i->default_sink_name); + + pa_threaded_mainloop_signal(ctl->p->mainloop, 0); } SND_CTL_PLUGIN_DEFINE_FUNC(polyp) @@ -603,7 +593,6 @@ SND_CTL_PLUGIN_DEFINE_FUNC(polyp) int err; snd_ctl_polyp_t *ctl; pa_operation *o; - pthread_mutexattr_t mutexattr; snd_config_for_each(i, next, conf) { snd_config_t *n = snd_config_iterator_entry(i); @@ -647,21 +636,15 @@ SND_CTL_PLUGIN_DEFINE_FUNC(polyp) ctl = calloc(1, sizeof(*ctl)); ctl->p = polyp_new(); - assert(ctl->p); - - err = polyp_connect(ctl->p, server); - if (err < 0) + if (!ctl->p) { + err = -EIO; goto error; + } - err = polyp_start_thread(ctl->p); + err = polyp_connect(ctl->p, server); if (err < 0) goto error; - pthread_mutexattr_init(&mutexattr); - pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE); - pthread_mutex_init(&ctl->mutex, &mutexattr); - pthread_mutexattr_destroy(&mutexattr); - if (source) ctl->source = strdup(source); else if (device) @@ -673,19 +656,33 @@ SND_CTL_PLUGIN_DEFINE_FUNC(polyp) ctl->sink = strdup(device); if (!ctl->source || !ctl->sink) { + pa_threaded_mainloop_lock(ctl->p->mainloop); + o = pa_context_get_server_info(ctl->p->context, server_info_cb, ctl); err = polyp_wait_operation(ctl->p, o); + pa_operation_unref(o); + + pa_threaded_mainloop_unlock(ctl->p->mainloop); + if (err < 0) goto error; } + pa_threaded_mainloop_lock(ctl->p->mainloop); + pa_context_set_subscribe_callback(ctl->p->context, event_cb, ctl); o = pa_context_subscribe(ctl->p->context, - PA_SUBSCRIPTION_MASK_SINK | PA_SUBSCRIPTION_MASK_SOURCE, NULL, NULL); + PA_SUBSCRIPTION_MASK_SINK | PA_SUBSCRIPTION_MASK_SOURCE, + polyp_context_success_cb, ctl->p); + err = polyp_wait_operation(ctl->p, o); + pa_operation_unref(o); + + pa_threaded_mainloop_unlock(ctl->p->mainloop); + if (err < 0) goto error; diff --git a/polyp/pcm_polyp.c b/polyp/pcm_polyp.c index 032ccbb..44ae35e 100644 --- a/polyp/pcm_polyp.c +++ b/polyp/pcm_polyp.c @@ -21,8 +21,6 @@ #include #include -#include - #include #include @@ -46,8 +44,6 @@ typedef struct snd_pcm_polyp { pa_sample_spec ss; unsigned int frame_size; pa_buffer_attr buffer_attr; - - pthread_mutex_t mutex; } snd_pcm_polyp_t; static void update_ptr(snd_pcm_polyp_t *pcm) @@ -74,20 +70,17 @@ static int polyp_start(snd_pcm_ioplug_t *io) int err = 0; assert(pcm); + assert(pcm->p); - pthread_mutex_lock(&pcm->mutex); - - assert(pcm->p && pcm->stream); + pa_threaded_mainloop_lock(pcm->p->mainloop); - err = polyp_finish_poll(pcm->p); - if (err < 0) - goto finish; + assert(pcm->stream); err = polyp_check_connection(pcm->p); if (err < 0) goto finish; - o = pa_stream_cork(pcm->stream, 0, NULL, NULL); + o = pa_stream_cork(pcm->stream, 0, polyp_stream_success_cb, pcm->p); assert(o); err = polyp_wait_operation(pcm->p, o); @@ -100,7 +93,7 @@ static int polyp_start(snd_pcm_ioplug_t *io) } finish: - pthread_mutex_unlock(&pcm->mutex); + pa_threaded_mainloop_unlock(pcm->p->mainloop); return err; } @@ -112,20 +105,17 @@ static int polyp_stop(snd_pcm_ioplug_t *io) int err = 0; assert(pcm); + assert(pcm->p); - pthread_mutex_lock(&pcm->mutex); - - assert(pcm->p && pcm->stream); + pa_threaded_mainloop_lock(pcm->p->mainloop); - err = polyp_finish_poll(pcm->p); - if (err < 0) - goto finish; + assert(pcm->stream); err = polyp_check_connection(pcm->p); if (err < 0) goto finish; - o = pa_stream_flush(pcm->stream, NULL, NULL); + o = pa_stream_flush(pcm->stream, polyp_stream_success_cb, pcm->p); assert(o); err = polyp_wait_operation(pcm->p, o); @@ -137,7 +127,7 @@ static int polyp_stop(snd_pcm_ioplug_t *io) goto finish; } - o = pa_stream_cork(pcm->stream, 1, NULL, NULL); + o = pa_stream_cork(pcm->stream, 1, polyp_stream_success_cb, pcm->p); assert(o); err = polyp_wait_operation(pcm->p, o); @@ -150,7 +140,7 @@ static int polyp_stop(snd_pcm_ioplug_t *io) } finish: - pthread_mutex_unlock(&pcm->mutex); + pa_threaded_mainloop_unlock(pcm->p->mainloop); return err; } @@ -162,20 +152,17 @@ int polyp_drain(snd_pcm_ioplug_t *io) int err = 0; assert(pcm); + assert(pcm->p); - pthread_mutex_lock(&pcm->mutex); - - assert(pcm->p && pcm->stream); + pa_threaded_mainloop_lock(pcm->p->mainloop); - err = polyp_finish_poll(pcm->p); - if (err < 0) - goto finish; + assert(pcm->stream); err = polyp_check_connection(pcm->p); if (err < 0) goto finish; - o = pa_stream_drain(pcm->stream, NULL, NULL); + o = pa_stream_drain(pcm->stream, polyp_stream_success_cb, pcm->p); assert(o); err = polyp_wait_operation(pcm->p, o); @@ -188,7 +175,7 @@ int polyp_drain(snd_pcm_ioplug_t *io) } finish: - pthread_mutex_unlock(&pcm->mutex); + pa_threaded_mainloop_unlock(pcm->p->mainloop); return err; } @@ -199,14 +186,11 @@ static snd_pcm_sframes_t polyp_pointer(snd_pcm_ioplug_t *io) int err = 0; assert(pcm); + assert(pcm->p); - pthread_mutex_lock(&pcm->mutex); - - assert(pcm->p && pcm->stream); + pa_threaded_mainloop_lock(pcm->p->mainloop); - err = polyp_finish_poll(pcm->p); - if (err < 0) - goto finish; + assert(pcm->stream); err = polyp_check_connection(pcm->p); if (err < 0) @@ -214,14 +198,41 @@ static snd_pcm_sframes_t polyp_pointer(snd_pcm_ioplug_t *io) update_ptr(pcm); - err = polyp_start_poll(pcm->p); + err = snd_pcm_bytes_to_frames(io->pcm, pcm->ptr); + +finish: + pa_threaded_mainloop_unlock(pcm->p->mainloop); + + return err; +} + +static int polyp_delay(snd_pcm_ioplug_t *io, + snd_pcm_sframes_t *delayp) +{ + snd_pcm_polyp_t *pcm = io->private_data; + int err = 0; + pa_usec_t lat; + + assert(pcm); + assert(pcm->p); + + pa_threaded_mainloop_lock(pcm->p->mainloop); + + assert(pcm->stream); + + err = polyp_check_connection(pcm->p); if (err < 0) goto finish; - err = snd_pcm_bytes_to_frames(io->pcm, pcm->ptr); + if (pa_stream_get_latency(pcm->stream, &lat, NULL)) { + err = -EIO; + goto finish; + } + + *delayp = snd_pcm_bytes_to_frames(io->pcm, pa_usec_to_bytes(lat, &pcm->ss)); finish: - pthread_mutex_unlock(&pcm->mutex); + pa_threaded_mainloop_unlock(pcm->p->mainloop); return err; } @@ -236,14 +247,11 @@ static snd_pcm_sframes_t polyp_write(snd_pcm_ioplug_t *io, int err = 0; assert(pcm); + assert(pcm->p); - pthread_mutex_lock(&pcm->mutex); - - assert(pcm->p && pcm->stream); + pa_threaded_mainloop_lock(pcm->p->mainloop); - err = polyp_finish_poll(pcm->p); - if (err < 0) - goto finish; + assert(pcm->stream); err = polyp_check_connection(pcm->p); if (err < 0) @@ -261,10 +269,13 @@ static snd_pcm_sframes_t polyp_write(snd_pcm_ioplug_t *io, /* Make sure the buffer pointer is in sync */ update_ptr(pcm); + if (pcm->last_size < pcm->buffer_attr.minreq) + polyp_poll_deactivate(pcm->p); + err = size; finish: - pthread_mutex_unlock(&pcm->mutex); + pa_threaded_mainloop_unlock(pcm->p->mainloop); return err; } @@ -280,14 +291,11 @@ static snd_pcm_sframes_t polyp_read(snd_pcm_ioplug_t *io, int err = 0; assert(pcm); + assert(pcm->p); - pthread_mutex_lock(&pcm->mutex); - - assert(pcm->p && pcm->stream); + pa_threaded_mainloop_lock(pcm->p->mainloop); - err = polyp_finish_poll(pcm->p); - if (err < 0) - goto finish; + assert(pcm->stream); err = polyp_check_connection(pcm->p); if (err < 0) @@ -325,28 +333,40 @@ static snd_pcm_sframes_t polyp_read(snd_pcm_ioplug_t *io, /* Make sure the buffer pointer is in sync */ update_ptr(pcm); + if (pcm->last_size < pcm->buffer_attr.minreq) + polyp_poll_deactivate(pcm->p); + err = size - (remain_size / pcm->frame_size); finish: - pthread_mutex_unlock(&pcm->mutex); + pa_threaded_mainloop_unlock(pcm->p->mainloop); return err; } +static void stream_request_cb(pa_stream *p, size_t length, void *userdata) +{ + snd_pcm_polyp_t *pcm = userdata; + + assert(pcm); + assert(pcm->p); + + polyp_poll_activate(pcm->p); +} + static int polyp_pcm_poll_descriptors_count(snd_pcm_ioplug_t *io) { snd_pcm_polyp_t *pcm = io->private_data; int count; assert(pcm); - - pthread_mutex_lock(&pcm->mutex); - assert(pcm->p); + pa_threaded_mainloop_lock(pcm->p->mainloop); + count = polyp_poll_descriptors_count(pcm->p); - pthread_mutex_unlock(&pcm->mutex); + pa_threaded_mainloop_unlock(pcm->p->mainloop); return count; } @@ -357,14 +377,13 @@ static int polyp_pcm_poll_descriptors(snd_pcm_ioplug_t *io, struct pollfd *pfd, int err; assert(pcm); - - pthread_mutex_lock(&pcm->mutex); - assert(pcm->p); + pa_threaded_mainloop_lock(pcm->p->mainloop); + err = polyp_poll_descriptors(pcm->p, pfd, space); - pthread_mutex_unlock(&pcm->mutex); + pa_threaded_mainloop_unlock(pcm->p->mainloop); return err; } @@ -375,11 +394,10 @@ static int polyp_pcm_poll_revents(snd_pcm_ioplug_t *io, struct pollfd *pfd, unsi int err = 0; assert(pcm); - - pthread_mutex_lock(&pcm->mutex); - assert(pcm->p); + pa_threaded_mainloop_lock(pcm->p->mainloop); + err = polyp_poll_revents(pcm->p, pfd, nfds, revents); if (err < 0) goto finish; @@ -402,25 +420,21 @@ static int polyp_pcm_poll_revents(snd_pcm_ioplug_t *io, struct pollfd *pfd, unsi } finish: - pthread_mutex_unlock(&pcm->mutex); + pa_threaded_mainloop_unlock(pcm->p->mainloop); return err; } static int polyp_prepare(snd_pcm_ioplug_t *io) { + pa_channel_map map; snd_pcm_polyp_t *pcm = io->private_data; int err = 0; assert(pcm); - - pthread_mutex_lock(&pcm->mutex); - assert(pcm->p); - err = polyp_finish_poll(pcm->p); - if (err < 0) - goto finish; + pa_threaded_mainloop_lock(pcm->p->mainloop); if (pcm->stream) { pa_stream_disconnect(pcm->stream); @@ -436,15 +450,24 @@ static int polyp_prepare(snd_pcm_ioplug_t *io) assert(pcm->stream == NULL); if (io->stream == SND_PCM_STREAM_PLAYBACK) - pcm->stream = pa_stream_new(pcm->p->context, "ALSA Playback", &pcm->ss, NULL); + pcm->stream = pa_stream_new(pcm->p->context, "ALSA Playback", &pcm->ss, + pa_channel_map_init_auto(&map, pcm->ss.channels, PA_CHANNEL_MAP_ALSA)); else - pcm->stream = pa_stream_new(pcm->p->context, "ALSA Capture", &pcm->ss, NULL); + pcm->stream = pa_stream_new(pcm->p->context, "ALSA Capture", &pcm->ss, + pa_channel_map_init_auto(&map, pcm->ss.channels, PA_CHANNEL_MAP_ALSA)); assert(pcm->stream); - if (io->stream == SND_PCM_STREAM_PLAYBACK) - pa_stream_connect_playback(pcm->stream, pcm->device, &pcm->buffer_attr, 0, NULL, NULL); - else - pa_stream_connect_record(pcm->stream, pcm->device, &pcm->buffer_attr, 0); + pa_stream_set_state_callback(pcm->stream, polyp_stream_state_cb, pcm->p); + + if (io->stream == SND_PCM_STREAM_PLAYBACK) { + pa_stream_set_write_callback(pcm->stream, stream_request_cb, pcm); + pa_stream_connect_playback(pcm->stream, pcm->device, &pcm->buffer_attr, + PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING, NULL, NULL); + } else { + pa_stream_set_read_callback(pcm->stream, stream_request_cb, pcm); + pa_stream_connect_record(pcm->stream, pcm->device, &pcm->buffer_attr, + PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING); + } err = polyp_wait_stream_state(pcm->p, pcm->stream, PA_STREAM_READY); if (err < 0) { @@ -459,7 +482,7 @@ static int polyp_prepare(snd_pcm_ioplug_t *io) pcm->offset = 0; finish: - pthread_mutex_unlock(&pcm->mutex); + pa_threaded_mainloop_unlock(pcm->p->mainloop); return err; } @@ -470,10 +493,11 @@ static int polyp_hw_params(snd_pcm_ioplug_t *io, snd_pcm_hw_params_t *params) int err = 0; assert(pcm); + assert(pcm->p); - pthread_mutex_lock(&pcm->mutex); + pa_threaded_mainloop_lock(pcm->p->mainloop); - assert(pcm->p && !pcm->stream); + assert(!pcm->stream); pcm->frame_size = (snd_pcm_format_physical_width(io->format) * io->channels) / 8; @@ -516,7 +540,7 @@ static int polyp_hw_params(snd_pcm_ioplug_t *io, snd_pcm_hw_params_t *params) pcm->buffer_attr.fragsize = io->period_size * pcm->frame_size; finish: - pthread_mutex_unlock(&pcm->mutex); + pa_threaded_mainloop_unlock(pcm->p->mainloop); return err; } @@ -527,51 +551,55 @@ static int polyp_close(snd_pcm_ioplug_t *io) assert(pcm); + pa_threaded_mainloop_lock(pcm->p->mainloop); + if (pcm->stream) { pa_stream_disconnect(pcm->stream); polyp_wait_stream_state(pcm->p, pcm->stream, PA_STREAM_TERMINATED); pa_stream_unref(pcm->stream); } + pa_threaded_mainloop_unlock(pcm->p->mainloop); + if (pcm->p) polyp_free(pcm->p); if (pcm->device) free(pcm->device); - pthread_mutex_destroy(&pcm->mutex); - free(pcm); return 0; } static snd_pcm_ioplug_callback_t polyp_playback_callback = { - .start = polyp_start, - .stop = polyp_stop, + .start = polyp_start, + .stop = polyp_stop, .drain = polyp_drain, - .pointer = polyp_pointer, - .transfer = polyp_write, + .pointer = polyp_pointer, + .transfer = polyp_write, + .delay = polyp_delay, .poll_descriptors_count = polyp_pcm_poll_descriptors_count, .poll_descriptors = polyp_pcm_poll_descriptors, .poll_revents = polyp_pcm_poll_revents, - .prepare = polyp_prepare, - .hw_params = polyp_hw_params, - .close = polyp_close, + .prepare = polyp_prepare, + .hw_params = polyp_hw_params, + .close = polyp_close, }; static snd_pcm_ioplug_callback_t polyp_capture_callback = { - .start = polyp_start, - .stop = polyp_stop, - .pointer = polyp_pointer, - .transfer = polyp_read, + .start = polyp_start, + .stop = polyp_stop, + .pointer = polyp_pointer, + .transfer = polyp_read, + .delay = polyp_delay, .poll_descriptors_count = polyp_pcm_poll_descriptors_count, .poll_descriptors = polyp_pcm_poll_descriptors, .poll_revents = polyp_pcm_poll_revents, - .prepare = polyp_prepare, - .hw_params = polyp_hw_params, - .close = polyp_close, + .prepare = polyp_prepare, + .hw_params = polyp_hw_params, + .close = polyp_close, }; @@ -645,7 +673,6 @@ SND_PCM_PLUGIN_DEFINE_FUNC(polyp) const char *device = NULL; int err; snd_pcm_polyp_t *pcm; - pthread_mutexattr_t mutexattr; snd_config_for_each(i, next, conf) { snd_config_t *n = snd_config_iterator_entry(i); @@ -678,21 +705,15 @@ SND_PCM_PLUGIN_DEFINE_FUNC(polyp) pcm->device = strdup(device); pcm->p = polyp_new(); - assert(pcm->p); - - err = polyp_connect(pcm->p, server); - if (err < 0) + if (!pcm->p) { + err = -EIO; goto error; + } - err = polyp_start_thread(pcm->p); + err = polyp_connect(pcm->p, server); if (err < 0) goto error; - pthread_mutexattr_init(&mutexattr); - pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE); - pthread_mutex_init(&pcm->mutex, &mutexattr); - pthread_mutexattr_destroy(&mutexattr); - pcm->io.version = SND_PCM_IOPLUG_VERSION; pcm->io.name = "ALSA <-> Polypaudio PCM I/O Plugin"; pcm->io.poll_fd = -1; diff --git a/polyp/polyp.c b/polyp/polyp.c index f10221c..f5e5cac 100644 --- a/polyp/polyp.c +++ b/polyp/polyp.c @@ -19,177 +19,71 @@ */ #include +#include #include -#include -#include -#include - -#include +#include #include "polyp.h" -enum { - COMMAND_POLL = 'p', - COMMAND_QUIT = 'q', - COMMAND_POLL_DONE = 'P', - COMMAND_POLL_FAILED = 'F', -}; - -static int write_command(snd_polyp_t *p, char command) -{ - if (write(p->main_fd, &command, 1) != 1) - return -errno; - return 0; -} - -static int write_reply(snd_polyp_t *p, char reply) -{ - if (write(p->thread_fd, &reply, 1) != 1) - return -errno; - return 0; -} - -static int read_command(snd_polyp_t *p) -{ - char command; - - if (read(p->thread_fd, &command, 1) != 1) - return -errno; - - return command; -} - -static int read_reply(snd_polyp_t *p) -{ - char reply; - - if (read(p->main_fd, &reply, 1) != 1) - return -errno; - - return reply; -} - -static void* thread_func(void *data) +int polyp_check_connection(snd_polyp_t *p) { - snd_polyp_t *p = (snd_polyp_t*)data; - sigset_t mask; - char command; - int ret; - - sigfillset(&mask); - pthread_sigmask(SIG_BLOCK, &mask, NULL); - - do { - command = read_command(p); - if (command < 0) - break; + pa_context_state_t state; - switch (command) { - case COMMAND_POLL: - do { - ret = pa_mainloop_poll(p->mainloop); - } while ((ret < 0) && (errno == EINTR)); + assert(p && p->context && p->mainloop); - ret = write_reply(p, (ret < 0) ? COMMAND_POLL_FAILED : COMMAND_POLL_DONE); - if (ret < 0) - return NULL; + state = pa_context_get_state(p->context); - break; - } - } while (command != COMMAND_QUIT); + if (state != PA_CONTEXT_READY) + return -EIO; - return NULL; + return 0; } -int polyp_start_poll(snd_polyp_t *p) +void polyp_stream_state_cb(pa_stream *s, void * userdata) { - int err; + snd_polyp_t *p = userdata; + assert(s); assert(p); - if (p->state == POLYP_STATE_POLLING) - return 0; - - assert(p->state == POLYP_STATE_READY); - - err = pa_mainloop_prepare(p->mainloop, -1); - if (err < 0) - return err; - - err = write_command(p, COMMAND_POLL); - if (err < 0) - return err; - - p->state = POLYP_STATE_POLLING; - - return 0; + pa_threaded_mainloop_signal(p->mainloop, 0); } -int polyp_finish_poll(snd_polyp_t *p) +void polyp_stream_success_cb(pa_stream *s, int success, void *userdata) { - char reply; - int err; + snd_polyp_t *p = userdata; + assert(s); assert(p); - if (p->state == POLYP_STATE_READY) - return 0; - - assert(p->state == POLYP_STATE_POLLING); - - p->state = POLYP_STATE_READY; - - pa_mainloop_wakeup(p->mainloop); - - reply = read_reply(p); - - if (reply == COMMAND_POLL_DONE) { - err = pa_mainloop_dispatch(p->mainloop); - if (err < 0) - return err; - } else - return -EIO; - - return 0; + pa_threaded_mainloop_signal(p->mainloop, 0); } -int polyp_check_connection(snd_polyp_t *p) +void polyp_context_success_cb(pa_context *c, int success, void *userdata) { - pa_context_state_t state; - - assert(p && p->context); + snd_polyp_t *p = userdata; - state = pa_context_get_state(p->context); - - if (state != PA_CONTEXT_READY) - return -EIO; + assert(c); + assert(p); - return 0; + pa_threaded_mainloop_signal(p->mainloop, 0); } int polyp_wait_operation(snd_polyp_t *p, pa_operation *o) { - int err; + assert(p && o && (p->state == POLYP_STATE_READY) && p->mainloop); - assert(p && o && (p->state == POLYP_STATE_READY)); - - while (pa_operation_get_state(o) == PA_OPERATION_RUNNING) { - p->state = POLYP_STATE_POLLING; - err = pa_mainloop_iterate(p->mainloop, 1, NULL); - p->state = POLYP_STATE_READY; - if (err < 0) - return err; - } + while (pa_operation_get_state(o) == PA_OPERATION_RUNNING) + pa_threaded_mainloop_wait(p->mainloop); return 0; } int polyp_wait_stream_state(snd_polyp_t *p, pa_stream *stream, pa_stream_state_t target) { - int err; pa_stream_state_t state; - assert(p && stream && (p->state == POLYP_STATE_READY)); + assert(p && stream && (p->state == POLYP_STATE_READY) && p->mainloop); while (1) { state = pa_stream_get_state(stream); @@ -200,34 +94,72 @@ int polyp_wait_stream_state(snd_polyp_t *p, pa_stream *stream, pa_stream_state_t if (state == target) break; - p->state = POLYP_STATE_POLLING; - err = pa_mainloop_iterate(p->mainloop, 1, NULL); - p->state = POLYP_STATE_READY; - if (err < 0) - return -EIO; + pa_threaded_mainloop_wait(p->mainloop); } return 0; } +static void context_state_cb(pa_context *c, void *userdata) { + snd_polyp_t *p = userdata; + assert(c); + + switch (pa_context_get_state(c)) { + case PA_CONTEXT_READY: + case PA_CONTEXT_TERMINATED: + case PA_CONTEXT_FAILED: + pa_threaded_mainloop_signal(p->mainloop, 0); + break; + + case PA_CONTEXT_UNCONNECTED: + case PA_CONTEXT_CONNECTING: + case PA_CONTEXT_AUTHORIZING: + case PA_CONTEXT_SETTING_NAME: + break; + } +} + snd_polyp_t *polyp_new() { snd_polyp_t *p; + int fd[2] = { -1, -1 }; + char proc[PATH_MAX], buf[PATH_MAX + 20]; p = calloc(1, sizeof(snd_polyp_t)); assert(p); p->state = POLYP_STATE_INIT; - p->main_fd = -1; - p->thread_fd = -1; - p->thread_running = 0; + if (pipe(fd)) { + free(p); + return NULL; + } + + p->main_fd = fd[0]; + p->thread_fd = fd[1]; + + fcntl(fd[0], F_SETFL, O_NONBLOCK); + fcntl(fd[1], F_SETFL, O_NONBLOCK); - p->mainloop = pa_mainloop_new(); + signal(SIGPIPE, SIG_IGN); /* Yes, ugly as hell */ + + p->mainloop = pa_threaded_mainloop_new(); assert(p->mainloop); - p->context = pa_context_new(pa_mainloop_get_api(p->mainloop), - "ALSA Plugin"); + if (pa_threaded_mainloop_start(p->mainloop) < 0) { + pa_threaded_mainloop_free(p->mainloop); + close(fd[0]); + close(fd[1]); + free(p); + return NULL; + } + + if (pa_get_binary_name(proc, sizeof(proc))) + snprintf(buf, sizeof(buf), "ALSA plug-in [%s]", pa_path_get_filename(proc)); + else + snprintf(buf, sizeof(buf), "ALSA plug-in"); + + p->context = pa_context_new(pa_threaded_mainloop_get_api(p->mainloop), buf); assert(p->context); return p; @@ -235,22 +167,13 @@ snd_polyp_t *polyp_new() void polyp_free(snd_polyp_t *p) { - if (p->thread_running) { - assert(p->mainloop && p->thread); - write_command(p, COMMAND_QUIT); - pa_mainloop_wakeup(p->mainloop); - pthread_join(p->thread, NULL); - } + pa_threaded_mainloop_stop(p->mainloop); - if (p->context) - pa_context_unref(p->context); - if (p->mainloop) - pa_mainloop_free(p->mainloop); + pa_context_unref(p->context); + pa_threaded_mainloop_free(p->mainloop); - if (p->thread_fd >= 0) - close(p->thread_fd); - if (p->main_fd >= 0) - close(p->main_fd); + close(p->thread_fd); + close(p->main_fd); free(p); } @@ -258,70 +181,52 @@ void polyp_free(snd_polyp_t *p) int polyp_connect(snd_polyp_t *p, const char *server) { int err; - pa_context_state_t state; assert(p && p->context && p->mainloop && (p->state == POLYP_STATE_INIT)); + pa_threaded_mainloop_lock(p->mainloop); + err = pa_context_connect(p->context, server, 0, NULL); if (err < 0) goto error; - while (1) { - state = pa_context_get_state(p->context); + pa_context_set_state_callback(p->context, context_state_cb, p); - if (state == PA_CONTEXT_FAILED) - goto error; + pa_threaded_mainloop_wait(p->mainloop); - if (state == PA_CONTEXT_READY) - break; + if (pa_context_get_state(p->context) != PA_CONTEXT_READY) + goto error; - err = pa_mainloop_iterate(p->mainloop, 1, NULL); - if (err < 0) - return -EIO; - } + pa_threaded_mainloop_unlock(p->mainloop); - p->state = POLYP_STATE_CONNECTED; + p->state = POLYP_STATE_READY; return 0; error: fprintf(stderr, "*** POLYPAUDIO: Unable to connect: %s\n", pa_strerror(pa_context_errno(p->context))); + + pa_threaded_mainloop_unlock(p->mainloop); + return -ECONNREFUSED; } -int polyp_start_thread(snd_polyp_t *p) +void polyp_poll_activate(snd_polyp_t *p) { - int err; - int fd[2] = { -1, -1 }; - - assert(p && (p->state == POLYP_STATE_CONNECTED)); - - if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd) < 0) { - perror("socketpair()"); - return -errno; - } - - p->thread_fd = fd[0]; - p->main_fd = fd[1]; - - p->thread_running = 0; + assert(p); - err = pthread_create(&p->thread, NULL, thread_func, p); - if (err) { - SNDERR("pthread_create(): %s", strerror(err)); - close(fd[0]); - close(fd[1]); - p->main_fd = -1; - p->thread_fd = -1; - return -err; - } + write(p->thread_fd, "a", 1); +} - p->thread_running = 1; +void polyp_poll_deactivate(snd_polyp_t *p) +{ + char buf[10]; - p->state = POLYP_STATE_READY; + assert(p); - return 0; + /* Drain the pipe */ + while (read(p->main_fd, buf, sizeof(buf)) > 0); } int polyp_poll_descriptors_count(snd_polyp_t *p) @@ -336,22 +241,12 @@ int polyp_poll_descriptors_count(snd_polyp_t *p) int polyp_poll_descriptors(snd_polyp_t *p, struct pollfd *pfd, unsigned int space) { - int err; - assert(p); - err = polyp_finish_poll(p); - if (err < 0) - return err; - - err = polyp_start_poll(p); - if (err < 0) - return err; - assert(space >= 1); pfd[0].fd = p->main_fd; - pfd[0].events = POLL_IN; + pfd[0].events = POLLIN; pfd[0].revents = 0; return 1; @@ -359,20 +254,7 @@ int polyp_poll_descriptors(snd_polyp_t *p, struct pollfd *pfd, unsigned int spac int polyp_poll_revents(snd_polyp_t *p, struct pollfd *pfd, unsigned int nfds, unsigned short *revents) { - int err; - assert(p); - err = polyp_finish_poll(p); - if (err < 0) - return err; - - err = polyp_check_connection(p); - if (err < 0) - return err; - - /* - * The application might redo the poll immediatly. - */ - return polyp_poll_descriptors(p, pfd, nfds); + return 1; } diff --git a/polyp/polyp.h b/polyp/polyp.h index 8c2ab95..f210e18 100644 --- a/polyp/polyp.h +++ b/polyp/polyp.h @@ -21,32 +21,27 @@ #include #include -#include #define ARRAY_SIZE(a) (sizeof(a)/sizeof((a)[0])) typedef struct snd_polyp { - pa_mainloop *mainloop; + pa_threaded_mainloop *mainloop; pa_context *context; int thread_fd, main_fd; - pthread_t thread; - int thread_running; - enum { POLYP_STATE_INIT, - POLYP_STATE_CONNECTED, POLYP_STATE_READY, - POLYP_STATE_POLLING, } state; } snd_polyp_t; -int polyp_start_poll(snd_polyp_t *p); -int polyp_finish_poll(snd_polyp_t *p); - int polyp_check_connection(snd_polyp_t *p); +void polyp_stream_state_cb(pa_stream *s, void * userdata); +void polyp_stream_success_cb(pa_stream *s, int success, void *userdata); +void polyp_context_success_cb(pa_context *c, int success, void *userdata); + int polyp_wait_operation(snd_polyp_t *p, pa_operation *o); int polyp_wait_stream_state(snd_polyp_t *p, pa_stream *stream, pa_stream_state_t target); @@ -54,8 +49,9 @@ snd_polyp_t *polyp_new(); void polyp_free(snd_polyp_t *p); int polyp_connect(snd_polyp_t *p, const char *server); -int polyp_start_thread(snd_polyp_t *p); +void polyp_poll_activate(snd_polyp_t *p); +void polyp_poll_deactivate(snd_polyp_t *p); int polyp_poll_descriptors_count(snd_polyp_t *p); int polyp_poll_descriptors(snd_polyp_t *p, struct pollfd *pfd, unsigned int space); int polyp_poll_revents(snd_polyp_t *p, struct pollfd *pfd, unsigned int nfds, unsigned short *revents); -- cgit