diff options
| author | Pierre Ossman <ossman@cendio.se> | 2006-05-29 12:19:46 +0200 | 
|---|---|---|
| committer | Takashi Iwai <tiwai@suse.de> | 2006-05-29 12:19:46 +0200 | 
| commit | e945a42f2c28c271a56ce0546d38f0c33c4b8125 (patch) | |
| tree | 245157d31dbe9aa7427f52b782c0d2cf3cd60390 | |
| parent | a80fb39fa7cfdf1a5b157b526d3eebe0cd73716e (diff) | |
Update Polypaudio plug-in to the 0.9.0 API
The new version of Polypaudio includes a threading abstraction that
allows application of a more synchronous nature to use the API more
easily. Using this, the complexity of the Polypaudio plug-in is greatly
reduced and also removes the risk of stalling the communications layer.
Signed-off-by: Pierre Ossman <ossman@cendio.se>
| -rw-r--r-- | configure.in | 2 | ||||
| -rw-r--r-- | polyp/ctl_polyp.c | 141 | ||||
| -rw-r--r-- | polyp/pcm_polyp.c | 235 | ||||
| -rw-r--r-- | polyp/polyp.c | 324 | ||||
| -rw-r--r-- | polyp/polyp.h | 18 | 
5 files changed, 308 insertions, 412 deletions
diff --git a/configure.in b/configure.in index 178d1e4..9c779e2 100644 --- a/configure.in +++ b/configure.in @@ -16,7 +16,7 @@ AC_CHECK_LIB(asound, snd_pcm_ioplug_create,,  PKG_CHECK_MODULES(JACK, jack >= 0.98, [HAVE_JACK=yes], [HAVE_JACK=no])  AM_CONDITIONAL(HAVE_JACK, test x$HAVE_JACK = xyes) -PKG_CHECK_MODULES(polypaudio, [polyplib], [HAVE_POLYP=yes], [HAVE_POLYP=no]) +PKG_CHECK_MODULES(polypaudio, [polyplib >= 0.9.0], [HAVE_POLYP=yes], [HAVE_POLYP=no])  AM_CONDITIONAL(HAVE_POLYP, test x$HAVE_POLYP = xyes)  PKG_CHECK_MODULES(samplerate, [samplerate], [HAVE_SAMPLERATE=yes], [HAVE_SAMPLERATE=no]) diff --git a/polyp/ctl_polyp.c b/polyp/ctl_polyp.c index d27e8d9..dea3fb8 100644 --- a/polyp/ctl_polyp.c +++ b/polyp/ctl_polyp.c @@ -20,8 +20,6 @@  #include <sys/poll.h> -#include <pthread.h> -  #include <alsa/asoundlib.h>  #include <alsa/control_external.h> @@ -43,8 +41,6 @@ typedef struct snd_ctl_polyp {      int subscribed;      int updated; - -    pthread_mutex_t mutex;  } snd_ctl_polyp_t;  #define SOURCE_VOL_NAME "Capture Volume" @@ -62,14 +58,19 @@ static void sink_info_cb(pa_context *c, const pa_sink_info *i, int is_last, void      snd_ctl_polyp_t *ctl = (snd_ctl_polyp_t*)userdata;      int chan; -    if (is_last) +    assert(ctl); + +    if (is_last) { +        pa_threaded_mainloop_signal(ctl->p->mainloop, 0);          return; +    } -    assert(ctl && i); +    assert(i);      if (!!ctl->sink_muted != !!i->mute) {          ctl->sink_muted = i->mute;          ctl->updated |= UPDATE_SINK_MUTE; +        polyp_poll_activate(ctl->p);      }      if (ctl->sink_volume.channels == i->volume.channels) { @@ -81,6 +82,7 @@ static void sink_info_cb(pa_context *c, const pa_sink_info *i, int is_last, void              return;          ctl->updated |= UPDATE_SINK_VOL; +        polyp_poll_activate(ctl->p);      }      memcpy(&ctl->sink_volume, &i->volume, sizeof(pa_cvolume)); @@ -91,14 +93,19 @@ static void source_info_cb(pa_context *c, const pa_source_info *i, int is_last,      snd_ctl_polyp_t *ctl = (snd_ctl_polyp_t*)userdata;      int chan; -    if (is_last) +    assert(ctl); + +    if (is_last) { +        pa_threaded_mainloop_signal(ctl->p->mainloop, 0);          return; +    } -    assert(ctl && i); +    assert(i);      if (!!ctl->source_muted != !!i->mute) {          ctl->source_muted = i->mute;          ctl->updated |= UPDATE_SOURCE_MUTE; +        polyp_poll_activate(ctl->p);      }      if (ctl->source_volume.channels == i->volume.channels) { @@ -110,6 +117,7 @@ static void source_info_cb(pa_context *c, const pa_source_info *i, int is_last,              return;          ctl->updated |= UPDATE_SOURCE_VOL; +        polyp_poll_activate(ctl->p);      }      memcpy(&ctl->source_volume, &i->volume, sizeof(pa_cvolume)); @@ -163,14 +171,14 @@ static int polyp_elem_count(snd_ctl_ext_t *ext)      assert(ctl); -    pthread_mutex_lock(&ctl->mutex); +    pa_threaded_mainloop_lock(ctl->p->mainloop);      if (ctl->source)          count += 2;      if (ctl->sink)          count += 2; -    pthread_mutex_unlock(&ctl->mutex); +    pa_threaded_mainloop_unlock(ctl->p->mainloop);      return count;  } @@ -184,7 +192,7 @@ static int polyp_elem_list(snd_ctl_ext_t *ext, unsigned int offset,      snd_ctl_elem_id_set_interface(id, SND_CTL_ELEM_IFACE_MIXER); -    pthread_mutex_lock(&ctl->mutex); +    pa_threaded_mainloop_lock(ctl->p->mainloop);      if (ctl->source) {          if (offset == 0) @@ -194,7 +202,7 @@ static int polyp_elem_list(snd_ctl_ext_t *ext, unsigned int offset,      } else          offset += 2; -    pthread_mutex_unlock(&ctl->mutex); +    pa_threaded_mainloop_unlock(ctl->p->mainloop);      if (offset == 2)          snd_ctl_elem_id_set_name(id, SINK_VOL_NAME); @@ -229,18 +237,13 @@ static int polyp_get_attribute(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key,      snd_ctl_polyp_t *ctl = ext->private_data;      int err = 0; -    assert(ctl); -      if (key > 3)          return -EINVAL; -    pthread_mutex_lock(&ctl->mutex); - +    assert(ctl);      assert(ctl->p); -    err = polyp_finish_poll(ctl->p); -    if (err < 0) -        goto finish; +    pa_threaded_mainloop_lock(ctl->p->mainloop);      err = polyp_check_connection(ctl->p);      if (err < 0) @@ -265,7 +268,7 @@ static int polyp_get_attribute(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key,          *count = 1;  finish: -    pthread_mutex_unlock(&ctl->mutex); +    pa_threaded_mainloop_unlock(ctl->p->mainloop);      return err;  } @@ -288,14 +291,9 @@ static int polyp_read_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key,      pa_cvolume *vol = NULL;      assert(ctl); - -    pthread_mutex_lock(&ctl->mutex); -      assert(ctl->p); -    err = polyp_finish_poll(ctl->p); -    if (err < 0) -        goto finish; +    pa_threaded_mainloop_lock(ctl->p->mainloop);      err = polyp_check_connection(ctl->p);      if (err < 0) @@ -329,7 +327,7 @@ static int polyp_read_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key,      }  finish: -    pthread_mutex_unlock(&ctl->mutex); +    pa_threaded_mainloop_unlock(ctl->p->mainloop);      return err;  } @@ -343,14 +341,9 @@ static int polyp_write_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key,      pa_cvolume *vol = NULL;      assert(ctl); - -    pthread_mutex_lock(&ctl->mutex); -      assert(ctl->p && ctl->p->context); -    err = polyp_finish_poll(ctl->p); -    if (err < 0) -        goto finish; +    pa_threaded_mainloop_lock(ctl->p->mainloop);      err = polyp_check_connection(ctl->p);      if (err < 0) @@ -395,17 +388,17 @@ static int polyp_write_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key,          if (key == 0)              o = pa_context_set_source_volume_by_name(ctl->p->context, -                ctl->source, vol, NULL, NULL); +                ctl->source, vol, polyp_context_success_cb, ctl->p);          else              o = pa_context_set_sink_volume_by_name(ctl->p->context, -                ctl->sink, vol, NULL, NULL); +                ctl->sink, vol, polyp_context_success_cb, ctl->p);      } else {          if (key == 1)              o = pa_context_set_source_mute_by_name(ctl->p->context, -                ctl->source, ctl->source_muted, NULL, NULL); +                ctl->source, ctl->source_muted, polyp_context_success_cb, ctl->p);          else              o = pa_context_set_sink_mute_by_name(ctl->p->context, -                ctl->sink, ctl->sink_muted, NULL, NULL); +                ctl->sink, ctl->sink_muted, polyp_context_success_cb, ctl->p);      }      err = polyp_wait_operation(ctl->p, o); @@ -416,7 +409,7 @@ static int polyp_write_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key,      err = 1;  finish: -    pthread_mutex_unlock(&ctl->mutex); +    pa_threaded_mainloop_unlock(ctl->p->mainloop);      return err;  } @@ -427,11 +420,11 @@ static void polyp_subscribe_events(snd_ctl_ext_t *ext, int subscribe)      assert(ctl); -    pthread_mutex_lock(&ctl->mutex); +    pa_threaded_mainloop_lock(ctl->p->mainloop);      ctl->subscribed = !!(subscribe & SND_CTL_EVENT_MASK_VALUE); -    pthread_mutex_unlock(&ctl->mutex); +    pa_threaded_mainloop_unlock(ctl->p->mainloop);  }  static int polyp_read_event(snd_ctl_ext_t *ext, snd_ctl_elem_id_t *id, @@ -443,7 +436,7 @@ static int polyp_read_event(snd_ctl_ext_t *ext, snd_ctl_elem_id_t *id,      assert(ctl); -    pthread_mutex_lock(&ctl->mutex); +    pa_threaded_mainloop_lock(ctl->p->mainloop);      if (!ctl->updated || !ctl->subscribed)          goto finish; @@ -469,10 +462,13 @@ static int polyp_read_event(snd_ctl_ext_t *ext, snd_ctl_elem_id_t *id,      *event_mask = SND_CTL_EVENT_MASK_VALUE; -    err = 0; +    if (!ctl->updated) +        polyp_poll_deactivate(ctl->p); + +    err = 1;  finish: -    pthread_mutex_unlock(&ctl->mutex); +    pa_threaded_mainloop_unlock(ctl->p->mainloop);      return err;  } @@ -483,14 +479,13 @@ static int polyp_ctl_poll_descriptors_count(snd_ctl_ext_t *ext)  	int count;      assert(ctl); - -    pthread_mutex_lock(&ctl->mutex); -      assert(ctl->p); +    pa_threaded_mainloop_lock(ctl->p->mainloop); +      count = polyp_poll_descriptors_count(ctl->p); -    pthread_mutex_unlock(&ctl->mutex); +    pa_threaded_mainloop_unlock(ctl->p->mainloop);      return count;  } @@ -502,20 +497,16 @@ static int polyp_ctl_poll_descriptors(snd_ctl_ext_t *ext, struct pollfd *pfd, un  	snd_ctl_polyp_t *ctl = ext->private_data;      assert(ctl); - -    pthread_mutex_lock(&ctl->mutex); -      assert(ctl->p); +    pa_threaded_mainloop_lock(ctl->p->mainloop); +      num = polyp_poll_descriptors(ctl->p, pfd, space);      if (num < 0)          goto finish; -    if (ctl->updated) -        pa_mainloop_wakeup(ctl->p->mainloop); -  finish: -    pthread_mutex_unlock(&ctl->mutex); +    pa_threaded_mainloop_unlock(ctl->p->mainloop);      return num;  } @@ -526,11 +517,10 @@ static int polyp_ctl_poll_revents(snd_ctl_ext_t *ext, struct pollfd *pfd, unsign  	int err = 0;      assert(ctl); - -    pthread_mutex_lock(&ctl->mutex); -      assert(ctl->p); +    pa_threaded_mainloop_lock(ctl->p->mainloop); +      err = polyp_poll_revents(ctl->p, pfd, nfds, revents);      if (err < 0)          goto finish; @@ -541,7 +531,7 @@ static int polyp_ctl_poll_revents(snd_ctl_ext_t *ext, struct pollfd *pfd, unsign          *revents |= POLLIN;  finish: -    pthread_mutex_unlock(&ctl->mutex); +    pa_threaded_mainloop_unlock(ctl->p->mainloop);      return err;  } @@ -560,8 +550,6 @@ static void polyp_close(snd_ctl_ext_t *ext)      if (ctl->sink)          free(ctl->sink); -    pthread_mutex_destroy(&ctl->mutex); -  	free(ctl);  } @@ -591,6 +579,8 @@ static void server_info_cb(pa_context *c, const pa_server_info*i, void *userdata          ctl->source = strdup(i->default_source_name);      if (i->default_sink_name && !ctl->sink)          ctl->sink = strdup(i->default_sink_name); + +    pa_threaded_mainloop_signal(ctl->p->mainloop, 0);  }  SND_CTL_PLUGIN_DEFINE_FUNC(polyp) @@ -603,7 +593,6 @@ SND_CTL_PLUGIN_DEFINE_FUNC(polyp)  	int err;  	snd_ctl_polyp_t *ctl;      pa_operation *o; -    pthread_mutexattr_t mutexattr;  	snd_config_for_each(i, next, conf) {  		snd_config_t *n = snd_config_iterator_entry(i); @@ -647,21 +636,15 @@ SND_CTL_PLUGIN_DEFINE_FUNC(polyp)  	ctl = calloc(1, sizeof(*ctl));      ctl->p = polyp_new(); -    assert(ctl->p); - -    err = polyp_connect(ctl->p, server); -    if (err < 0) +    if (!ctl->p) { +        err = -EIO;          goto error; +    } -    err = polyp_start_thread(ctl->p); +    err = polyp_connect(ctl->p, server);      if (err < 0)          goto error; -    pthread_mutexattr_init(&mutexattr); -    pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE); -    pthread_mutex_init(&ctl->mutex, &mutexattr); -    pthread_mutexattr_destroy(&mutexattr); -      if (source)          ctl->source = strdup(source);      else if (device) @@ -673,19 +656,33 @@ SND_CTL_PLUGIN_DEFINE_FUNC(polyp)          ctl->sink = strdup(device);      if (!ctl->source || !ctl->sink) { +        pa_threaded_mainloop_lock(ctl->p->mainloop); +          o = pa_context_get_server_info(ctl->p->context, server_info_cb, ctl);          err = polyp_wait_operation(ctl->p, o); +          pa_operation_unref(o); + +        pa_threaded_mainloop_unlock(ctl->p->mainloop); +          if (err < 0)              goto error;      } +    pa_threaded_mainloop_lock(ctl->p->mainloop); +      pa_context_set_subscribe_callback(ctl->p->context, event_cb, ctl);      o = pa_context_subscribe(ctl->p->context, -        PA_SUBSCRIPTION_MASK_SINK | PA_SUBSCRIPTION_MASK_SOURCE, NULL, NULL); +        PA_SUBSCRIPTION_MASK_SINK | PA_SUBSCRIPTION_MASK_SOURCE, +        polyp_context_success_cb, ctl->p); +      err = polyp_wait_operation(ctl->p, o); +      pa_operation_unref(o); + +    pa_threaded_mainloop_unlock(ctl->p->mainloop); +      if (err < 0)          goto error; diff --git a/polyp/pcm_polyp.c b/polyp/pcm_polyp.c index 032ccbb..44ae35e 100644 --- a/polyp/pcm_polyp.c +++ b/polyp/pcm_polyp.c @@ -21,8 +21,6 @@  #include <stdio.h>  #include <sys/poll.h> -#include <pthread.h> -  #include <alsa/asoundlib.h>  #include <alsa/pcm_external.h> @@ -46,8 +44,6 @@ typedef struct snd_pcm_polyp {      pa_sample_spec ss;      unsigned int frame_size;      pa_buffer_attr buffer_attr; - -    pthread_mutex_t mutex;  } snd_pcm_polyp_t;  static void update_ptr(snd_pcm_polyp_t *pcm) @@ -74,20 +70,17 @@ static int polyp_start(snd_pcm_ioplug_t *io)  	int err = 0;      assert(pcm); +    assert(pcm->p); -    pthread_mutex_lock(&pcm->mutex); - -    assert(pcm->p && pcm->stream); +    pa_threaded_mainloop_lock(pcm->p->mainloop); -    err = polyp_finish_poll(pcm->p); -    if (err < 0) -        goto finish; +    assert(pcm->stream);      err = polyp_check_connection(pcm->p);      if (err < 0)          goto finish; -    o = pa_stream_cork(pcm->stream, 0, NULL, NULL); +    o = pa_stream_cork(pcm->stream, 0, polyp_stream_success_cb, pcm->p);      assert(o);      err = polyp_wait_operation(pcm->p, o); @@ -100,7 +93,7 @@ static int polyp_start(snd_pcm_ioplug_t *io)      }  finish: -    pthread_mutex_unlock(&pcm->mutex); +    pa_threaded_mainloop_unlock(pcm->p->mainloop);  	return err;  } @@ -112,20 +105,17 @@ static int polyp_stop(snd_pcm_ioplug_t *io)  	int err = 0;      assert(pcm); +    assert(pcm->p); -    pthread_mutex_lock(&pcm->mutex); - -    assert(pcm->p && pcm->stream); +    pa_threaded_mainloop_lock(pcm->p->mainloop); -    err = polyp_finish_poll(pcm->p); -    if (err < 0) -        goto finish; +    assert(pcm->stream);      err = polyp_check_connection(pcm->p);      if (err < 0)          goto finish; -    o = pa_stream_flush(pcm->stream, NULL, NULL); +    o = pa_stream_flush(pcm->stream, polyp_stream_success_cb, pcm->p);      assert(o);      err = polyp_wait_operation(pcm->p, o); @@ -137,7 +127,7 @@ static int polyp_stop(snd_pcm_ioplug_t *io)          goto finish;      } -    o = pa_stream_cork(pcm->stream, 1, NULL, NULL); +    o = pa_stream_cork(pcm->stream, 1, polyp_stream_success_cb, pcm->p);      assert(o);      err = polyp_wait_operation(pcm->p, o); @@ -150,7 +140,7 @@ static int polyp_stop(snd_pcm_ioplug_t *io)      }  finish: -    pthread_mutex_unlock(&pcm->mutex); +    pa_threaded_mainloop_unlock(pcm->p->mainloop);  	return err;  } @@ -162,20 +152,17 @@ int polyp_drain(snd_pcm_ioplug_t *io)  	int err = 0;      assert(pcm); +    assert(pcm->p); -    pthread_mutex_lock(&pcm->mutex); - -    assert(pcm->p && pcm->stream); +    pa_threaded_mainloop_lock(pcm->p->mainloop); -    err = polyp_finish_poll(pcm->p); -    if (err < 0) -        goto finish; +    assert(pcm->stream);      err = polyp_check_connection(pcm->p);      if (err < 0)          goto finish; -    o = pa_stream_drain(pcm->stream, NULL, NULL); +    o = pa_stream_drain(pcm->stream, polyp_stream_success_cb, pcm->p);      assert(o);      err = polyp_wait_operation(pcm->p, o); @@ -188,7 +175,7 @@ int polyp_drain(snd_pcm_ioplug_t *io)      }  finish: -    pthread_mutex_unlock(&pcm->mutex); +    pa_threaded_mainloop_unlock(pcm->p->mainloop);  	return err;  } @@ -199,14 +186,11 @@ static snd_pcm_sframes_t polyp_pointer(snd_pcm_ioplug_t *io)  	int err = 0;      assert(pcm); +    assert(pcm->p); -    pthread_mutex_lock(&pcm->mutex); - -    assert(pcm->p && pcm->stream); +    pa_threaded_mainloop_lock(pcm->p->mainloop); -    err = polyp_finish_poll(pcm->p); -    if (err < 0) -        goto finish; +    assert(pcm->stream);      err = polyp_check_connection(pcm->p);      if (err < 0) @@ -214,14 +198,41 @@ static snd_pcm_sframes_t polyp_pointer(snd_pcm_ioplug_t *io)      update_ptr(pcm); -    err = polyp_start_poll(pcm->p); +	err = snd_pcm_bytes_to_frames(io->pcm, pcm->ptr); + +finish: +    pa_threaded_mainloop_unlock(pcm->p->mainloop); + +	return err; +} + +static int polyp_delay(snd_pcm_ioplug_t *io, +                       snd_pcm_sframes_t *delayp) +{ +	snd_pcm_polyp_t *pcm = io->private_data; +	int err = 0; +	pa_usec_t lat; + +    assert(pcm); +    assert(pcm->p); + +    pa_threaded_mainloop_lock(pcm->p->mainloop); + +    assert(pcm->stream); + +    err = polyp_check_connection(pcm->p);      if (err < 0)          goto finish; -	err = snd_pcm_bytes_to_frames(io->pcm, pcm->ptr); +	if (pa_stream_get_latency(pcm->stream, &lat, NULL)) { +		err = -EIO; +		goto finish; +	} + +	*delayp = snd_pcm_bytes_to_frames(io->pcm, pa_usec_to_bytes(lat, &pcm->ss));  finish: -    pthread_mutex_unlock(&pcm->mutex); +    pa_threaded_mainloop_unlock(pcm->p->mainloop);  	return err;  } @@ -236,14 +247,11 @@ static snd_pcm_sframes_t polyp_write(snd_pcm_ioplug_t *io,  	int err = 0;      assert(pcm); +    assert(pcm->p); -    pthread_mutex_lock(&pcm->mutex); - -    assert(pcm->p && pcm->stream); +    pa_threaded_mainloop_lock(pcm->p->mainloop); -    err = polyp_finish_poll(pcm->p); -    if (err < 0) -        goto finish; +    assert(pcm->stream);      err = polyp_check_connection(pcm->p);      if (err < 0) @@ -261,10 +269,13 @@ static snd_pcm_sframes_t polyp_write(snd_pcm_ioplug_t *io,      /* Make sure the buffer pointer is in sync */      update_ptr(pcm); +    if (pcm->last_size < pcm->buffer_attr.minreq) +        polyp_poll_deactivate(pcm->p); +      err = size;  finish: -    pthread_mutex_unlock(&pcm->mutex); +    pa_threaded_mainloop_unlock(pcm->p->mainloop);  	return err;  } @@ -280,14 +291,11 @@ static snd_pcm_sframes_t polyp_read(snd_pcm_ioplug_t *io,  	int err = 0;      assert(pcm); +    assert(pcm->p); -    pthread_mutex_lock(&pcm->mutex); - -    assert(pcm->p && pcm->stream); +    pa_threaded_mainloop_lock(pcm->p->mainloop); -    err = polyp_finish_poll(pcm->p); -    if (err < 0) -        goto finish; +    assert(pcm->stream);      err = polyp_check_connection(pcm->p);      if (err < 0) @@ -325,28 +333,40 @@ static snd_pcm_sframes_t polyp_read(snd_pcm_ioplug_t *io,      /* Make sure the buffer pointer is in sync */      update_ptr(pcm); +    if (pcm->last_size < pcm->buffer_attr.minreq) +        polyp_poll_deactivate(pcm->p); +      err = size - (remain_size / pcm->frame_size);  finish: -    pthread_mutex_unlock(&pcm->mutex); +    pa_threaded_mainloop_unlock(pcm->p->mainloop);  	return err;  } +static void stream_request_cb(pa_stream *p, size_t length, void *userdata) +{ +    snd_pcm_polyp_t *pcm = userdata; + +    assert(pcm); +    assert(pcm->p); + +    polyp_poll_activate(pcm->p); +} +  static int polyp_pcm_poll_descriptors_count(snd_pcm_ioplug_t *io)  {  	snd_pcm_polyp_t *pcm = io->private_data;  	int count;      assert(pcm); - -    pthread_mutex_lock(&pcm->mutex); -      assert(pcm->p); +    pa_threaded_mainloop_lock(pcm->p->mainloop); +      count = polyp_poll_descriptors_count(pcm->p); -    pthread_mutex_unlock(&pcm->mutex); +    pa_threaded_mainloop_unlock(pcm->p->mainloop);  	return count;  } @@ -357,14 +377,13 @@ static int polyp_pcm_poll_descriptors(snd_pcm_ioplug_t *io, struct pollfd *pfd,  	int err;      assert(pcm); - -    pthread_mutex_lock(&pcm->mutex); -      assert(pcm->p); +    pa_threaded_mainloop_lock(pcm->p->mainloop); +      err = polyp_poll_descriptors(pcm->p, pfd, space); -    pthread_mutex_unlock(&pcm->mutex); +    pa_threaded_mainloop_unlock(pcm->p->mainloop);  	return err;  } @@ -375,11 +394,10 @@ static int polyp_pcm_poll_revents(snd_pcm_ioplug_t *io, struct pollfd *pfd, unsi  	int err = 0;      assert(pcm); - -    pthread_mutex_lock(&pcm->mutex); -      assert(pcm->p); +    pa_threaded_mainloop_lock(pcm->p->mainloop); +      err = polyp_poll_revents(pcm->p, pfd, nfds, revents);      if (err < 0)          goto finish; @@ -402,25 +420,21 @@ static int polyp_pcm_poll_revents(snd_pcm_ioplug_t *io, struct pollfd *pfd, unsi      }  finish: -    pthread_mutex_unlock(&pcm->mutex); +    pa_threaded_mainloop_unlock(pcm->p->mainloop);  	return err;  }  static int polyp_prepare(snd_pcm_ioplug_t *io)  { +    pa_channel_map map;      snd_pcm_polyp_t *pcm = io->private_data;      int err = 0;      assert(pcm); - -    pthread_mutex_lock(&pcm->mutex); -      assert(pcm->p); -    err = polyp_finish_poll(pcm->p); -    if (err < 0) -        goto finish; +    pa_threaded_mainloop_lock(pcm->p->mainloop);      if (pcm->stream) {          pa_stream_disconnect(pcm->stream); @@ -436,15 +450,24 @@ static int polyp_prepare(snd_pcm_ioplug_t *io)      assert(pcm->stream == NULL);      if (io->stream == SND_PCM_STREAM_PLAYBACK) -        pcm->stream = pa_stream_new(pcm->p->context, "ALSA Playback", &pcm->ss, NULL); +        pcm->stream = pa_stream_new(pcm->p->context, "ALSA Playback", &pcm->ss, +            pa_channel_map_init_auto(&map, pcm->ss.channels, PA_CHANNEL_MAP_ALSA));      else -        pcm->stream = pa_stream_new(pcm->p->context, "ALSA Capture", &pcm->ss, NULL); +        pcm->stream = pa_stream_new(pcm->p->context, "ALSA Capture", &pcm->ss, +            pa_channel_map_init_auto(&map, pcm->ss.channels, PA_CHANNEL_MAP_ALSA));      assert(pcm->stream); -    if (io->stream == SND_PCM_STREAM_PLAYBACK) -        pa_stream_connect_playback(pcm->stream, pcm->device, &pcm->buffer_attr, 0, NULL, NULL); -    else -        pa_stream_connect_record(pcm->stream, pcm->device, &pcm->buffer_attr, 0); +    pa_stream_set_state_callback(pcm->stream, polyp_stream_state_cb, pcm->p); + +    if (io->stream == SND_PCM_STREAM_PLAYBACK) { +        pa_stream_set_write_callback(pcm->stream, stream_request_cb, pcm); +        pa_stream_connect_playback(pcm->stream, pcm->device, &pcm->buffer_attr, +        	PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING, NULL, NULL); +    } else { +        pa_stream_set_read_callback(pcm->stream, stream_request_cb, pcm); +        pa_stream_connect_record(pcm->stream, pcm->device, &pcm->buffer_attr, +	        PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING); +    }      err = polyp_wait_stream_state(pcm->p, pcm->stream, PA_STREAM_READY);      if (err < 0) { @@ -459,7 +482,7 @@ static int polyp_prepare(snd_pcm_ioplug_t *io)      pcm->offset = 0;  finish: -    pthread_mutex_unlock(&pcm->mutex); +    pa_threaded_mainloop_unlock(pcm->p->mainloop);  	return err;  } @@ -470,10 +493,11 @@ static int polyp_hw_params(snd_pcm_ioplug_t *io, snd_pcm_hw_params_t *params)  	int err = 0;      assert(pcm); +    assert(pcm->p); -    pthread_mutex_lock(&pcm->mutex); +    pa_threaded_mainloop_lock(pcm->p->mainloop); -    assert(pcm->p && !pcm->stream); +    assert(!pcm->stream);      pcm->frame_size = (snd_pcm_format_physical_width(io->format) * io->channels) / 8; @@ -516,7 +540,7 @@ static int polyp_hw_params(snd_pcm_ioplug_t *io, snd_pcm_hw_params_t *params)      pcm->buffer_attr.fragsize = io->period_size * pcm->frame_size;  finish: -    pthread_mutex_unlock(&pcm->mutex); +    pa_threaded_mainloop_unlock(pcm->p->mainloop);  	return err;  } @@ -527,51 +551,55 @@ static int polyp_close(snd_pcm_ioplug_t *io)      assert(pcm); +    pa_threaded_mainloop_lock(pcm->p->mainloop); +      if (pcm->stream) {          pa_stream_disconnect(pcm->stream);          polyp_wait_stream_state(pcm->p, pcm->stream, PA_STREAM_TERMINATED);          pa_stream_unref(pcm->stream);      } +    pa_threaded_mainloop_unlock(pcm->p->mainloop); +      if (pcm->p)          polyp_free(pcm->p);      if (pcm->device)          free(pcm->device); -    pthread_mutex_destroy(&pcm->mutex); -  	free(pcm);  	return 0;  }  static snd_pcm_ioplug_callback_t polyp_playback_callback = { -	.start = polyp_start, -	.stop = polyp_stop, +    .start = polyp_start, +    .stop = polyp_stop,      .drain = polyp_drain, -	.pointer = polyp_pointer, -	.transfer = polyp_write, +    .pointer = polyp_pointer, +    .transfer = polyp_write, +    .delay = polyp_delay,      .poll_descriptors_count = polyp_pcm_poll_descriptors_count,      .poll_descriptors = polyp_pcm_poll_descriptors,      .poll_revents = polyp_pcm_poll_revents, -	.prepare = polyp_prepare, -	.hw_params = polyp_hw_params, -	.close = polyp_close, +    .prepare = polyp_prepare, +    .hw_params = polyp_hw_params, +    .close = polyp_close,  };  static snd_pcm_ioplug_callback_t polyp_capture_callback = { -	.start = polyp_start, -	.stop = polyp_stop, -	.pointer = polyp_pointer, -	.transfer = polyp_read, +    .start = polyp_start, +    .stop = polyp_stop, +    .pointer = polyp_pointer, +    .transfer = polyp_read, +    .delay = polyp_delay,      .poll_descriptors_count = polyp_pcm_poll_descriptors_count,      .poll_descriptors = polyp_pcm_poll_descriptors,      .poll_revents = polyp_pcm_poll_revents, -	.prepare = polyp_prepare, -	.hw_params = polyp_hw_params, -	.close = polyp_close, +    .prepare = polyp_prepare, +    .hw_params = polyp_hw_params, +    .close = polyp_close,  }; @@ -645,7 +673,6 @@ SND_PCM_PLUGIN_DEFINE_FUNC(polyp)  	const char *device = NULL;  	int err;  	snd_pcm_polyp_t *pcm; -    pthread_mutexattr_t mutexattr;  	snd_config_for_each(i, next, conf) {  		snd_config_t *n = snd_config_iterator_entry(i); @@ -678,21 +705,15 @@ SND_PCM_PLUGIN_DEFINE_FUNC(polyp)          pcm->device = strdup(device);      pcm->p = polyp_new(); -    assert(pcm->p); - -    err = polyp_connect(pcm->p, server); -    if (err < 0) +    if (!pcm->p) { +        err = -EIO;          goto error; +    } -    err = polyp_start_thread(pcm->p); +    err = polyp_connect(pcm->p, server);      if (err < 0)          goto error; -    pthread_mutexattr_init(&mutexattr); -    pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE); -    pthread_mutex_init(&pcm->mutex, &mutexattr); -    pthread_mutexattr_destroy(&mutexattr); -  	pcm->io.version = SND_PCM_IOPLUG_VERSION;  	pcm->io.name = "ALSA <-> Polypaudio PCM I/O Plugin";  	pcm->io.poll_fd = -1; diff --git a/polyp/polyp.c b/polyp/polyp.c index f10221c..f5e5cac 100644 --- a/polyp/polyp.c +++ b/polyp/polyp.c @@ -19,177 +19,71 @@   */  #include <stdio.h> +#include <unistd.h>  #include <signal.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <sys/un.h> - -#include <pthread.h> +#include <sys/poll.h>  #include "polyp.h" -enum { -    COMMAND_POLL = 'p', -    COMMAND_QUIT = 'q', -    COMMAND_POLL_DONE = 'P', -    COMMAND_POLL_FAILED = 'F', -}; - -static int write_command(snd_polyp_t *p, char command) -{ -    if (write(p->main_fd, &command, 1) != 1) -        return -errno; -    return 0; -} - -static int write_reply(snd_polyp_t *p, char reply) -{ -    if (write(p->thread_fd, &reply, 1) != 1) -        return -errno; -    return 0; -} - -static int read_command(snd_polyp_t *p) -{ -    char command; - -    if (read(p->thread_fd, &command, 1) != 1) -        return -errno; - -    return command; -} - -static int read_reply(snd_polyp_t *p) -{ -    char reply; - -    if (read(p->main_fd, &reply, 1) != 1) -        return -errno; - -    return reply; -} - -static void* thread_func(void *data) +int polyp_check_connection(snd_polyp_t *p)  { -    snd_polyp_t *p = (snd_polyp_t*)data; -    sigset_t mask; -    char command; -    int ret; - -    sigfillset(&mask); -    pthread_sigmask(SIG_BLOCK, &mask, NULL); - -    do { -        command = read_command(p); -        if (command < 0) -            break; +    pa_context_state_t state; -        switch (command) { -        case COMMAND_POLL: -            do { -                ret = pa_mainloop_poll(p->mainloop); -            } while ((ret < 0) && (errno == EINTR)); +    assert(p && p->context && p->mainloop); -            ret = write_reply(p, (ret < 0) ? COMMAND_POLL_FAILED : COMMAND_POLL_DONE); -            if (ret < 0) -                return NULL; +    state = pa_context_get_state(p->context); -            break; -        } -    } while (command != COMMAND_QUIT); +    if (state != PA_CONTEXT_READY) +        return -EIO; -    return NULL; +    return 0;  } -int polyp_start_poll(snd_polyp_t *p) +void polyp_stream_state_cb(pa_stream *s, void * userdata)  { -    int err; +    snd_polyp_t *p = userdata; +    assert(s);      assert(p); -    if (p->state == POLYP_STATE_POLLING) -        return 0; - -    assert(p->state == POLYP_STATE_READY); - -    err = pa_mainloop_prepare(p->mainloop, -1); -    if (err < 0) -        return err; - -    err = write_command(p, COMMAND_POLL); -    if (err < 0) -        return err; - -    p->state = POLYP_STATE_POLLING; - -    return 0; +    pa_threaded_mainloop_signal(p->mainloop, 0);  } -int polyp_finish_poll(snd_polyp_t *p) +void polyp_stream_success_cb(pa_stream *s, int success, void *userdata)  { -	char reply; -	int err; +    snd_polyp_t *p = userdata; +    assert(s);      assert(p); -    if (p->state == POLYP_STATE_READY) -        return 0; - -    assert(p->state == POLYP_STATE_POLLING); - -    p->state = POLYP_STATE_READY; - -    pa_mainloop_wakeup(p->mainloop); - -    reply = read_reply(p); - -    if (reply == COMMAND_POLL_DONE) { -        err = pa_mainloop_dispatch(p->mainloop); -        if (err < 0) -            return err; -    } else -        return -EIO; - -    return 0; +    pa_threaded_mainloop_signal(p->mainloop, 0);  } -int polyp_check_connection(snd_polyp_t *p) +void polyp_context_success_cb(pa_context *c, int success, void *userdata)  { -    pa_context_state_t state; - -    assert(p && p->context); +    snd_polyp_t *p = userdata; -    state = pa_context_get_state(p->context); - -    if (state != PA_CONTEXT_READY) -        return -EIO; +    assert(c); +    assert(p); -    return 0; +    pa_threaded_mainloop_signal(p->mainloop, 0);  }  int polyp_wait_operation(snd_polyp_t *p, pa_operation *o)  { -    int err; +    assert(p && o && (p->state == POLYP_STATE_READY) && p->mainloop); -    assert(p && o && (p->state == POLYP_STATE_READY)); - -    while (pa_operation_get_state(o) == PA_OPERATION_RUNNING) { -        p->state = POLYP_STATE_POLLING; -        err = pa_mainloop_iterate(p->mainloop, 1, NULL); -        p->state = POLYP_STATE_READY; -        if (err < 0) -            return err; -    } +    while (pa_operation_get_state(o) == PA_OPERATION_RUNNING) +        pa_threaded_mainloop_wait(p->mainloop);      return 0;  }  int polyp_wait_stream_state(snd_polyp_t *p, pa_stream *stream, pa_stream_state_t target)  { -    int err;      pa_stream_state_t state; -    assert(p && stream && (p->state == POLYP_STATE_READY)); +    assert(p && stream && (p->state == POLYP_STATE_READY) && p->mainloop);      while (1) {          state = pa_stream_get_state(stream); @@ -200,34 +94,72 @@ int polyp_wait_stream_state(snd_polyp_t *p, pa_stream *stream, pa_stream_state_t          if (state == target)              break; -        p->state = POLYP_STATE_POLLING; -        err = pa_mainloop_iterate(p->mainloop, 1, NULL); -        p->state = POLYP_STATE_READY; -        if (err < 0) -            return -EIO; +        pa_threaded_mainloop_wait(p->mainloop);      }      return 0;  } +static void context_state_cb(pa_context *c, void *userdata) { +    snd_polyp_t *p = userdata; +    assert(c); + +    switch (pa_context_get_state(c)) { +        case PA_CONTEXT_READY: +        case PA_CONTEXT_TERMINATED: +        case PA_CONTEXT_FAILED: +            pa_threaded_mainloop_signal(p->mainloop, 0); +            break; + +        case PA_CONTEXT_UNCONNECTED: +        case PA_CONTEXT_CONNECTING: +        case PA_CONTEXT_AUTHORIZING: +        case PA_CONTEXT_SETTING_NAME: +            break; +    } +} +  snd_polyp_t *polyp_new()  {      snd_polyp_t *p; +	int fd[2] = { -1, -1 }; +	char proc[PATH_MAX], buf[PATH_MAX + 20];      p = calloc(1, sizeof(snd_polyp_t));      assert(p);      p->state = POLYP_STATE_INIT; -    p->main_fd = -1; -    p->thread_fd = -1; -    p->thread_running = 0; +    if (pipe(fd)) { +        free(p); +        return NULL; +    } + +    p->main_fd = fd[0]; +    p->thread_fd = fd[1]; + +    fcntl(fd[0], F_SETFL, O_NONBLOCK); +    fcntl(fd[1], F_SETFL, O_NONBLOCK); -    p->mainloop = pa_mainloop_new(); +    signal(SIGPIPE, SIG_IGN); /* Yes, ugly as hell */ + +    p->mainloop = pa_threaded_mainloop_new();      assert(p->mainloop); -    p->context = pa_context_new(pa_mainloop_get_api(p->mainloop), -        "ALSA Plugin"); +    if (pa_threaded_mainloop_start(p->mainloop) < 0) { +        pa_threaded_mainloop_free(p->mainloop); +        close(fd[0]); +        close(fd[1]); +        free(p); +        return NULL; +    } + +    if (pa_get_binary_name(proc, sizeof(proc))) +        snprintf(buf, sizeof(buf), "ALSA plug-in [%s]", pa_path_get_filename(proc)); +    else +        snprintf(buf, sizeof(buf), "ALSA plug-in"); + +    p->context = pa_context_new(pa_threaded_mainloop_get_api(p->mainloop), buf);      assert(p->context);      return p; @@ -235,22 +167,13 @@ snd_polyp_t *polyp_new()  void polyp_free(snd_polyp_t *p)  { -    if (p->thread_running) { -        assert(p->mainloop && p->thread); -        write_command(p, COMMAND_QUIT); -        pa_mainloop_wakeup(p->mainloop); -        pthread_join(p->thread, NULL); -    } +    pa_threaded_mainloop_stop(p->mainloop); -    if (p->context) -        pa_context_unref(p->context); -    if (p->mainloop) -        pa_mainloop_free(p->mainloop); +    pa_context_unref(p->context); +    pa_threaded_mainloop_free(p->mainloop); -    if (p->thread_fd >= 0) -        close(p->thread_fd); -    if (p->main_fd >= 0) -        close(p->main_fd); +    close(p->thread_fd); +    close(p->main_fd);      free(p);  } @@ -258,70 +181,52 @@ void polyp_free(snd_polyp_t *p)  int polyp_connect(snd_polyp_t *p, const char *server)  {      int err; -    pa_context_state_t state;      assert(p && p->context && p->mainloop && (p->state == POLYP_STATE_INIT)); +    pa_threaded_mainloop_lock(p->mainloop); +      err = pa_context_connect(p->context, server, 0, NULL);      if (err < 0)          goto error; -    while (1) { -        state = pa_context_get_state(p->context); +    pa_context_set_state_callback(p->context, context_state_cb, p); -        if (state == PA_CONTEXT_FAILED) -            goto error; +    pa_threaded_mainloop_wait(p->mainloop); -        if (state == PA_CONTEXT_READY) -            break; +    if (pa_context_get_state(p->context) != PA_CONTEXT_READY) +        goto error; -        err = pa_mainloop_iterate(p->mainloop, 1, NULL); -        if (err < 0) -            return -EIO; -    } +    pa_threaded_mainloop_unlock(p->mainloop); -    p->state = POLYP_STATE_CONNECTED; +    p->state = POLYP_STATE_READY;      return 0;  error:      fprintf(stderr, "*** POLYPAUDIO: Unable to connect: %s\n",          pa_strerror(pa_context_errno(p->context))); + +    pa_threaded_mainloop_unlock(p->mainloop); +      return -ECONNREFUSED;  } -int polyp_start_thread(snd_polyp_t *p) +void polyp_poll_activate(snd_polyp_t *p)  { -    int err; -	int fd[2] = { -1, -1 }; - -    assert(p && (p->state == POLYP_STATE_CONNECTED)); - -    if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd) < 0) { -        perror("socketpair()"); -        return -errno; -    } - -    p->thread_fd = fd[0]; -    p->main_fd = fd[1]; - -    p->thread_running = 0; +    assert(p); -    err = pthread_create(&p->thread, NULL, thread_func, p); -    if (err) { -        SNDERR("pthread_create(): %s", strerror(err)); -        close(fd[0]); -        close(fd[1]); -        p->main_fd = -1; -        p->thread_fd = -1; -        return -err; -    } +    write(p->thread_fd, "a", 1); +} -    p->thread_running = 1; +void polyp_poll_deactivate(snd_polyp_t *p) +{ +	char buf[10]; -    p->state = POLYP_STATE_READY; +	assert(p); -    return 0; +    /* Drain the pipe */ +    while (read(p->main_fd, buf, sizeof(buf)) > 0);  }  int polyp_poll_descriptors_count(snd_polyp_t *p) @@ -336,22 +241,12 @@ int polyp_poll_descriptors_count(snd_polyp_t *p)  int polyp_poll_descriptors(snd_polyp_t *p, struct pollfd *pfd, unsigned int space)  { -	int err; -      assert(p); -    err = polyp_finish_poll(p); -    if (err < 0) -        return err; - -    err = polyp_start_poll(p); -    if (err < 0) -        return err; -      assert(space >= 1);      pfd[0].fd = p->main_fd; -    pfd[0].events = POLL_IN; +    pfd[0].events = POLLIN;      pfd[0].revents = 0;      return 1; @@ -359,20 +254,7 @@ int polyp_poll_descriptors(snd_polyp_t *p, struct pollfd *pfd, unsigned int spac  int polyp_poll_revents(snd_polyp_t *p, struct pollfd *pfd, unsigned int nfds, unsigned short *revents)  { -	int err; -      assert(p); -    err = polyp_finish_poll(p); -    if (err < 0) -        return err; - -    err = polyp_check_connection(p); -    if (err < 0) -        return err; - -    /* -     * The application might redo the poll immediatly. -     */ -    return polyp_poll_descriptors(p, pfd, nfds); +    return 1;  } diff --git a/polyp/polyp.h b/polyp/polyp.h index 8c2ab95..f210e18 100644 --- a/polyp/polyp.h +++ b/polyp/polyp.h @@ -21,32 +21,27 @@  #include <alsa/asoundlib.h>  #include <polyp/polypaudio.h> -#include <polyp/mainloop.h>  #define ARRAY_SIZE(a) (sizeof(a)/sizeof((a)[0]))  typedef struct snd_polyp { -    pa_mainloop *mainloop; +    pa_threaded_mainloop *mainloop;      pa_context *context;      int thread_fd, main_fd; -    pthread_t thread; -    int thread_running; -      enum {          POLYP_STATE_INIT, -        POLYP_STATE_CONNECTED,          POLYP_STATE_READY, -        POLYP_STATE_POLLING,      } state;  } snd_polyp_t; -int polyp_start_poll(snd_polyp_t *p); -int polyp_finish_poll(snd_polyp_t *p); -  int polyp_check_connection(snd_polyp_t *p); +void polyp_stream_state_cb(pa_stream *s, void * userdata); +void polyp_stream_success_cb(pa_stream *s, int success, void *userdata); +void polyp_context_success_cb(pa_context *c, int success, void *userdata); +  int polyp_wait_operation(snd_polyp_t *p, pa_operation *o);  int polyp_wait_stream_state(snd_polyp_t *p, pa_stream *stream, pa_stream_state_t target); @@ -54,8 +49,9 @@ snd_polyp_t *polyp_new();  void polyp_free(snd_polyp_t *p);  int polyp_connect(snd_polyp_t *p, const char *server); -int polyp_start_thread(snd_polyp_t *p); +void polyp_poll_activate(snd_polyp_t *p); +void polyp_poll_deactivate(snd_polyp_t *p);  int polyp_poll_descriptors_count(snd_polyp_t *p);  int polyp_poll_descriptors(snd_polyp_t *p, struct pollfd *pfd, unsigned int space);  int polyp_poll_revents(snd_polyp_t *p, struct pollfd *pfd, unsigned int nfds, unsigned short *revents);  | 
