diff options
-rw-r--r-- | src/modules/module-alsa-sink.c | 248 | ||||
-rw-r--r-- | src/modules/module-alsa-source.c | 273 | ||||
-rw-r--r-- | src/modules/module-null-sink.c | 3 | ||||
-rw-r--r-- | src/modules/module-pipe-sink.c | 3 | ||||
-rw-r--r-- | src/modules/module-pipe-source.c | 3 | ||||
-rw-r--r-- | src/pulsecore/asyncmsgq.c | 36 | ||||
-rw-r--r-- | src/pulsecore/asyncmsgq.h | 15 | ||||
-rw-r--r-- | src/pulsecore/asyncq.c | 124 | ||||
-rw-r--r-- | src/pulsecore/asyncq.h | 21 | ||||
-rw-r--r-- | src/pulsecore/rtpoll.c | 55 | ||||
-rw-r--r-- | src/pulsecore/rtpoll.h | 3 | ||||
-rw-r--r-- | src/pulsecore/thread-mq.c | 35 | ||||
-rw-r--r-- | src/pulsecore/thread-mq.h | 5 |
13 files changed, 524 insertions, 300 deletions
diff --git a/src/modules/module-alsa-sink.c b/src/modules/module-alsa-sink.c index 665dbdee..bc95e6c1 100644 --- a/src/modules/module-alsa-sink.c +++ b/src/modules/module-alsa-sink.c @@ -131,7 +131,6 @@ struct userdata { int64_t frame_index; snd_pcm_sframes_t hwbuf_unused_frames; - snd_pcm_sframes_t avail_min_frames; }; static void fix_tsched_watermark(struct userdata *u) { @@ -140,8 +139,61 @@ static void fix_tsched_watermark(struct userdata *u) { max_use = u->hwbuf_size - u->hwbuf_unused_frames * u->frame_size; - if (u->tsched_watermark >= max_use/2) - u->tsched_watermark = max_use/2; + if (u->tsched_watermark >= max_use-u->frame_size) + u->tsched_watermark = max_use-u->frame_size; +} + +static int try_recover(struct userdata *u, const char *call, int err) { + pa_assert(u); + pa_assert(call); + pa_assert(err < 0); + + pa_log_debug("%s: %s", call, snd_strerror(err)); + + if (err == -EAGAIN) { + pa_log_debug("%s: EAGAIN", call); + return 1; + } + + if (err == -EPIPE) + pa_log_debug("%s: Buffer underrun!", call); + + if ((err = snd_pcm_recover(u->pcm_handle, err, 1)) == 0) { + u->first = TRUE; + return 0; + } + + pa_log("%s: %s", call, snd_strerror(err)); + return -1; +} + +static void check_left_to_play(struct userdata *u, snd_pcm_sframes_t n) { + size_t left_to_play; + + if (u->first) + return; + + if (n*u->frame_size < u->hwbuf_size) + left_to_play = u->hwbuf_size - (n*u->frame_size); + else + left_to_play = 0; + + if (left_to_play > 0) + pa_log_debug("%0.2f ms left to play", (double) pa_bytes_to_usec(left_to_play, &u->sink->sample_spec) / PA_USEC_PER_MSEC); + else { + pa_log_info("Underrun!"); + + if (u->use_tsched) { + size_t old_watermark = u->tsched_watermark; + + u->tsched_watermark *= 2; + fix_tsched_watermark(u); + + if (old_watermark != u->tsched_watermark) + pa_log_notice("Increasing wakeup watermark to %0.2f ms", + (double) pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec) / PA_USEC_PER_MSEC); + } + } } static int mmap_write(struct userdata *u) { @@ -154,10 +206,9 @@ static int mmap_write(struct userdata *u) { pa_memchunk chunk; void *p; snd_pcm_sframes_t n; - int err; + int err, r; const snd_pcm_channel_area_t *areas; snd_pcm_uframes_t offset, frames; - size_t left_to_play; snd_pcm_hwsync(u->pcm_handle); @@ -166,70 +217,40 @@ static int mmap_write(struct userdata *u) { if (PA_UNLIKELY((n = snd_pcm_avail_update(u->pcm_handle)) < 0)) { - pa_log_debug("snd_pcm_avail_update: %s", snd_strerror(n)); - - if (err == -EAGAIN) { - pa_log_debug("EAGAIN"); - return work_done; - } - - if (n == -EPIPE) - pa_log_debug("snd_pcm_avail_update: Buffer underrun!"); - - if ((err = snd_pcm_recover(u->pcm_handle, n, 1)) == 0) { - u->first = TRUE; + if ((r = try_recover(u, "snd_pcm_avail_update", n)) == 0) continue; - } + else if (r > 0) + return work_done; - pa_log("snd_pcm_recover: %s", snd_strerror(err)); - return -1; + return r; } + check_left_to_play(u, n); + /* We only use part of the buffer that matches our * dynamically requested latency */ if (PA_UNLIKELY(n <= u->hwbuf_unused_frames)) return work_done; - if (n*u->frame_size < u->hwbuf_size) - left_to_play = u->hwbuf_size - (n*u->frame_size); - else - left_to_play = 0; - - pa_log_debug("%0.2f ms left to play", (double) pa_bytes_to_usec(left_to_play, &u->sink->sample_spec) / PA_USEC_PER_MSEC); - - if (left_to_play <= 0 && !u->first) { - u->tsched_watermark *= 2; - fix_tsched_watermark(u); - pa_log_notice("Underrun! Increasing wakeup watermark to %0.2f ms", - (double) pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec) / PA_USEC_PER_MSEC); - } - frames = n = n - u->hwbuf_unused_frames; - pa_log_debug("%llu frames to write", (unsigned long long) frames); + pa_log_debug("%lu frames to write", (unsigned long) frames); if (PA_UNLIKELY((err = snd_pcm_mmap_begin(u->pcm_handle, &areas, &offset, &frames)) < 0)) { - pa_log_debug("snd_pcm_mmap_begin: %s", snd_strerror(err)); - - if (err == -EAGAIN) { - pa_log_debug("EAGAIN"); - return work_done; - } - - if (err == -EPIPE) - pa_log_debug("snd_pcm_mmap_begin: Buffer underrun!"); - - if ((err = snd_pcm_recover(u->pcm_handle, err, 1)) == 0) { - u->first = TRUE; + if ((r = try_recover(u, "snd_pcm_mmap_begin", err)) == 0) continue; - } + else if (r > 0) + return work_done; - pa_log("Failed to write data to DSP: %s", snd_strerror(err)); - return -1; + return r; } + /* Make sure that if these memblocks need to be copied they will fit into one slot */ + if (frames > pa_mempool_block_size_max(u->sink->core->mempool)/u->frame_size) + frames = pa_mempool_block_size_max(u->sink->core->mempool)/u->frame_size; + /* Check these are multiples of 8 bit */ pa_assert((areas[0].first & 7) == 0); pa_assert((areas[0].step & 7)== 0); @@ -240,7 +261,7 @@ static int mmap_write(struct userdata *u) { p = (uint8_t*) areas[0].addr + (offset * u->frame_size); - chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, frames * u->frame_size, 1); + chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, frames * u->frame_size, TRUE); chunk.length = pa_memblock_get_length(chunk.memblock); chunk.index = 0; @@ -252,30 +273,19 @@ static int mmap_write(struct userdata *u) { if (PA_UNLIKELY((err = snd_pcm_mmap_commit(u->pcm_handle, offset, frames)) < 0)) { - pa_log_debug("snd_pcm_mmap_commit: %s", snd_strerror(err)); - - if (err == -EAGAIN) { - pa_log_debug("EAGAIN"); - return work_done; - } - - if (err == -EPIPE) - pa_log_debug("snd_pcm_mmap_commit: Buffer underrun!"); - - if ((err = snd_pcm_recover(u->pcm_handle, err, 1)) == 0) { - u->first = TRUE; + if ((r = try_recover(u, "snd_pcm_mmap_commit", err)) == 0) continue; - } + else if (r > 0) + return work_done; - pa_log("Failed to write data to DSP: %s", snd_strerror(err)); - return -1; + return r; } work_done = 1; u->frame_index += frames; - pa_log_debug("wrote %llu frames", (unsigned long long) frames); + pa_log_debug("wrote %lu frames", (unsigned long) frames); if (PA_LIKELY(frames >= (snd_pcm_uframes_t) n)) return work_done; @@ -283,40 +293,37 @@ static int mmap_write(struct userdata *u) { } static int unix_write(struct userdata *u) { - snd_pcm_status_t *status; int work_done = 0; - snd_pcm_status_alloca(&status); - pa_assert(u); pa_sink_assert_ref(u->sink); for (;;) { void *p; snd_pcm_sframes_t n, frames; - int err; + int r; snd_pcm_hwsync(u->pcm_handle); - snd_pcm_avail_update(u->pcm_handle); - if (PA_UNLIKELY((err = snd_pcm_status(u->pcm_handle, status)) < 0)) { - pa_log("Failed to query DSP status data: %s", snd_strerror(err)); - return -1; - } + if (PA_UNLIKELY((n = snd_pcm_avail_update(u->pcm_handle)) < 0)) { - if (PA_UNLIKELY(snd_pcm_status_get_avail_max(status)*u->frame_size >= u->hwbuf_size)) - pa_log_debug("Buffer underrun!"); + if ((r = try_recover(u, "snd_pcm_avail_update", n)) == 0) + continue; + else if (r > 0) + return work_done; - n = snd_pcm_status_get_avail(status); + return r; + } - /* We only use part of the buffer that matches our - * dynamically requested latency */ + check_left_to_play(u, n); if (PA_UNLIKELY(n <= u->hwbuf_unused_frames)) return work_done; n -= u->hwbuf_unused_frames; + pa_log_debug("%lu frames to write", (unsigned long) frames); + if (u->memchunk.length <= 0) pa_sink_render(u->sink, n * u->frame_size, &u->memchunk); @@ -335,21 +342,12 @@ static int unix_write(struct userdata *u) { if (PA_UNLIKELY(frames < 0)) { - if (frames == -EAGAIN) { - pa_log_debug("EAGAIN"); - return work_done; - } - - if (frames == -EPIPE) - pa_log_debug("snd_pcm_avail_update: Buffer underrun!"); - - if ((frames = snd_pcm_recover(u->pcm_handle, frames, 1)) == 0) { - u->first = TRUE; + if ((r = try_recover(u, "snd_pcm_writei", n)) == 0) continue; - } + else if (r > 0) + return work_done; - pa_log("Failed to write data to DSP: %s", snd_strerror(frames)); - return -1; + return r; } u->memchunk.index += frames * u->frame_size; @@ -364,6 +362,8 @@ static int unix_write(struct userdata *u) { u->frame_index += frames; + pa_log_debug("wrote %lu frames", (unsigned long) frames); + if (PA_LIKELY(frames >= n)) return work_done; } @@ -399,8 +399,8 @@ static void update_smoother(struct userdata *u) { return; } - frames = u->frame_index - delay; + /* pa_log_debug("frame_index = %llu, delay = %llu, p = %llu", (unsigned long long) u->frame_index, (unsigned long long) delay, (unsigned long long) frames); */ /* snd_pcm_status_get_tstamp(status, ×tamp); */ @@ -422,7 +422,7 @@ static pa_usec_t sink_get_latency(struct userdata *u) { now1 = pa_rtclock_usec(); now2 = pa_smoother_get(u->smoother, now1); - delay = (int64_t) pa_bytes_to_usec(u->frame_index * u->frame_size, &u->sink->sample_spec) - now2; + delay = (int64_t) pa_bytes_to_usec(u->frame_index * u->frame_size, &u->sink->sample_spec) - (int64_t) now2; if (delay > 0) r = (pa_usec_t) delay; @@ -434,28 +434,14 @@ static pa_usec_t sink_get_latency(struct userdata *u) { } static int build_pollfd(struct userdata *u) { - int err; - struct pollfd *pollfd; - int n; - pa_assert(u); pa_assert(u->pcm_handle); - if ((n = snd_pcm_poll_descriptors_count(u->pcm_handle)) < 0) { - pa_log("snd_pcm_poll_descriptors_count() failed: %s", snd_strerror(n)); - return -1; - } - if (u->alsa_rtpoll_item) pa_rtpoll_item_free(u->alsa_rtpoll_item); - u->alsa_rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, n); - pollfd = pa_rtpoll_item_get_pollfd(u->alsa_rtpoll_item, NULL); - - if ((err = snd_pcm_poll_descriptors(u->pcm_handle, pollfd, n)) < 0) { - pa_log("snd_pcm_poll_descriptors() failed: %s", snd_strerror(err)); + if (!(u->alsa_rtpoll_item = pa_alsa_build_pollfd(u->pcm_handle, u->rtpoll))) return -1; - } return 0; } @@ -491,7 +477,7 @@ static pa_usec_t hw_sleep_time(struct userdata *u) { if (usec == (pa_usec_t) -1) usec = pa_bytes_to_usec(u->hwbuf_size, &u->sink->sample_spec); -/* pa_log_debug("hw buffer time: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); */ + pa_log_debug("hw buffer time: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); wm = pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec); @@ -505,25 +491,27 @@ static pa_usec_t hw_sleep_time(struct userdata *u) { usec /= 2; } -/* pa_log_debug("after watermark: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); */ + pa_log_debug("after watermark: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); return usec; } static int update_sw_params(struct userdata *u) { + snd_pcm_uframes_t avail_min; int err; - pa_usec_t latency; pa_assert(u); /* Use the full buffer if noone asked us for anything specific */ u->hwbuf_unused_frames = 0; - if (u->use_tsched) + if (u->use_tsched) { + pa_usec_t latency; + if ((latency = pa_sink_get_requested_latency_within_thread(u->sink)) != (pa_usec_t) -1) { size_t b; - pa_log_debug("latency set to %llu", (unsigned long long) latency); + pa_log_debug("latency set to %0.2f", (double) latency / PA_USEC_PER_MSEC); b = pa_usec_to_bytes(latency, &u->sink->sample_spec); @@ -538,23 +526,23 @@ static int update_sw_params(struct userdata *u) { fix_tsched_watermark(u); } + } pa_log_debug("hwbuf_unused_frames=%lu", (unsigned long) u->hwbuf_unused_frames); /* We need at last one frame in the used part of the buffer */ - u->avail_min_frames = u->hwbuf_unused_frames + 1; + avail_min = u->hwbuf_unused_frames + 1; if (u->use_tsched) { pa_usec_t usec; usec = hw_sleep_time(u); - - u->avail_min_frames += (pa_usec_to_bytes(usec, &u->sink->sample_spec) / u->frame_size); + avail_min += pa_usec_to_bytes(usec, &u->sink->sample_spec); } - pa_log_debug("setting avail_min=%lu", (unsigned long) u->avail_min_frames); + pa_log_debug("setting avail_min=%lu", (unsigned long) avail_min); - if ((err = pa_alsa_set_sw_params(u->pcm_handle, u->avail_min_frames)) < 0) { + if ((err = pa_alsa_set_sw_params(u->pcm_handle, avail_min)) < 0) { pa_log("Failed to set software parameters: %s", snd_strerror(err)); return err; } @@ -678,14 +666,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse } break; - -/* case PA_SINK_MESSAGE_ADD_INPUT: */ -/* case PA_SINK_MESSAGE_REMOVE_INPUT: */ -/* case PA_SINK_MESSAGE_REMOVE_INPUT_AND_BUFFER: { */ -/* int r = pa_sink_process_msg(o, code, data, offset, chunk); */ -/* update_hwbuf_unused_frames(u); */ -/* return r; */ -/* } */ } return pa_sink_process_msg(o, code, data, offset, chunk); @@ -1091,10 +1071,9 @@ int pa__init(pa_module*m) { u->use_mmap = use_mmap; u->use_tsched = use_tsched; u->first = TRUE; - pa_thread_mq_init(&u->thread_mq, m->core->mainloop); u->rtpoll = pa_rtpoll_new(); + pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll); u->alsa_rtpoll_item = NULL; - pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq); u->smoother = pa_smoother_new(DEFAULT_TSCHED_BUFFER_USEC*2, DEFAULT_TSCHED_BUFFER_USEC*2, TRUE); usec = pa_rtclock_usec(); @@ -1238,7 +1217,6 @@ int pa__init(pa_module*m) { u->nfragments = nfrags; u->hwbuf_size = u->fragment_size * nfrags; u->hwbuf_unused_frames = 0; - u->avail_min_frames = 0; u->tsched_watermark = tsched_watermark; u->frame_index = 0; u->hw_dB_supported = FALSE; @@ -1246,12 +1224,10 @@ int pa__init(pa_module*m) { u->hw_volume_min = u->hw_volume_max = 0; if (use_tsched) - if (u->tsched_watermark >= u->hwbuf_size/2) - u->tsched_watermark = pa_frame_align(u->hwbuf_size/2, &ss); + fix_tsched_watermark(u); u->sink->thread_info.max_rewind = use_tsched ? u->hwbuf_size : 0; u->sink->max_latency = pa_bytes_to_usec(u->hwbuf_size, &ss); - if (!use_tsched) u->sink->min_latency = u->sink->max_latency; @@ -1325,7 +1301,7 @@ int pa__init(pa_module*m) { u->sink->get_volume = sink_get_volume_cb; u->sink->set_volume = sink_set_volume_cb; u->sink->flags |= PA_SINK_HW_VOLUME_CTRL | (u->hw_dB_supported ? PA_SINK_DECIBEL_VOLUME : 0); - pa_log_info("Using hardware volume control. %s dB scale.", u->hw_dB_supported ? "Using" : "Not using"); + pa_log_info("Using hardware volume control. Hardware dB scale %s.", u->hw_dB_supported ? "supported" : "not supported"); } else if (mixer_reset) { pa_log_info("Using software volume control. Trying to reset sound card to 0 dB."); diff --git a/src/modules/module-alsa-source.c b/src/modules/module-alsa-source.c index bcb3c2d0..3999ada7 100644 --- a/src/modules/module-alsa-source.c +++ b/src/modules/module-alsa-source.c @@ -127,8 +127,70 @@ struct userdata { pa_smoother *smoother; int64_t frame_index; + + snd_pcm_sframes_t hwbuf_unused_frames; }; +static void fix_tsched_watermark(struct userdata *u) { + size_t max_use; + pa_assert(u); + + max_use = u->hwbuf_size - u->hwbuf_unused_frames * u->frame_size; + + if (u->tsched_watermark >= max_use-u->frame_size) + u->tsched_watermark = max_use-u->frame_size; +} + +static int try_recover(struct userdata *u, const char *call, int err) { + pa_assert(u); + pa_assert(call); + pa_assert(err < 0); + + pa_log_debug("%s: %s", call, snd_strerror(err)); + + if (err == -EAGAIN) { + pa_log_debug("%s: EAGAIN", call); + return 1; + } + + if (err == -EPIPE) + pa_log_debug("%s: Buffer overrun!", call); + + if ((err = snd_pcm_recover(u->pcm_handle, err, 1)) == 0) { + snd_pcm_start(u->pcm_handle); + return 0; + } + + pa_log("%s: %s", call, snd_strerror(err)); + return -1; +} + +static void check_left_to_record(struct userdata *u, snd_pcm_sframes_t n) { + size_t left_to_record; + + if (n*u->frame_size < u->hwbuf_size) + left_to_record = u->hwbuf_size - (n*u->frame_size); + else + left_to_record = 0; + + if (left_to_record > 0) + pa_log_debug("%0.2f ms left to record", (double) pa_bytes_to_usec(left_to_record, &u->source->sample_spec) / PA_USEC_PER_MSEC); + else { + pa_log_info("Overrun!"); + + if (u->use_tsched) { + size_t old_watermark = u->tsched_watermark; + + u->tsched_watermark *= 2; + fix_tsched_watermark(u); + + if (old_watermark != u->tsched_watermark) + pa_log_notice("Increasing wakeup watermark to %0.2f ms", + (double) pa_bytes_to_usec(u->tsched_watermark, &u->source->sample_spec) / PA_USEC_PER_MSEC); + } + } +} + static int mmap_read(struct userdata *u) { int work_done = 0; @@ -137,7 +199,7 @@ static int mmap_read(struct userdata *u) { for (;;) { snd_pcm_sframes_t n; - int err; + int err, r; const snd_pcm_channel_area_t *areas; snd_pcm_uframes_t offset, frames; pa_memchunk chunk; @@ -147,51 +209,37 @@ static int mmap_read(struct userdata *u) { if (PA_UNLIKELY((n = snd_pcm_avail_update(u->pcm_handle)) < 0)) { - pa_log_debug("snd_pcm_avail_update: %s", snd_strerror(n)); - - if (err == -EAGAIN) { - pa_log_debug("EAGAIN"); - return work_done; - } - - if (n == -EPIPE) - pa_log_debug("snd_pcm_avail_update: Buffer underrun!"); - - if ((err = snd_pcm_recover(u->pcm_handle, n, 1)) == 0) { - snd_pcm_start(u->pcm_handle); + if ((r = try_recover(u, "snd_pcm_avail_update", err)) == 0) continue; - } + else if (r > 0) + return work_done; - pa_log("snd_pcm_recover: %s", snd_strerror(err)); - return -1; + return r; } + check_left_to_record(u, n); + if (PA_UNLIKELY(n <= 0)) return work_done; frames = n; - if (PA_UNLIKELY((err = snd_pcm_mmap_begin(u->pcm_handle, &areas, &offset, &frames)) < 0)) { + pa_log_debug("%lu frames to read", (unsigned long) frames); - pa_log_debug("snd_pcm_mmap_begin: %s", snd_strerror(err)); - - if (err == -EAGAIN) { - pa_log_debug("EAGAIN"); - return work_done; - } - - if (err == -EPIPE) - pa_log_debug("snd_pcm_mmap_begin: Buffer underrun!"); + if (PA_UNLIKELY((err = snd_pcm_mmap_begin(u->pcm_handle, &areas, &offset, &frames)) < 0)) { - if ((err = snd_pcm_recover(u->pcm_handle, err, 1)) == 0) { - snd_pcm_start(u->pcm_handle); + if ((r = try_recover(u, "snd_pcm_mmap_begin", err)) == 0) continue; - } + else if (r > 0) + return work_done; - pa_log("Failed to write data to DSP: %s", snd_strerror(err)); - return -1; + return r; } + /* Make sure that if these memblocks need to be copied they will fit into one slot */ + if (frames > pa_mempool_block_size_max(u->source->core->mempool)/u->frame_size) + frames = pa_mempool_block_size_max(u->source->core->mempool)/u->frame_size; + /* Check these are multiples of 8 bit */ pa_assert((areas[0].first & 7) == 0); pa_assert((areas[0].step & 7)== 0); @@ -202,42 +250,28 @@ static int mmap_read(struct userdata *u) { p = (uint8_t*) areas[0].addr + (offset * u->frame_size); - chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, frames * u->frame_size, 1); + chunk.memblock = pa_memblock_new_fixed(u->core->mempool, p, frames * u->frame_size, TRUE); chunk.length = pa_memblock_get_length(chunk.memblock); chunk.index = 0; pa_source_post(u->source, &chunk); - - /* FIXME: Maybe we can do something to keep this memory block - * a little bit longer around? */ pa_memblock_unref_fixed(chunk.memblock); if (PA_UNLIKELY((err = snd_pcm_mmap_commit(u->pcm_handle, offset, frames)) < 0)) { - pa_log_debug("snd_pcm_mmap_commit: %s", snd_strerror(err)); - - if (err == -EAGAIN) { - pa_log_debug("EAGAIN"); - return work_done; - } - - if (err == -EPIPE) - pa_log_debug("snd_pcm_mmap_commit: Buffer underrun!"); - - if ((err = snd_pcm_recover(u->pcm_handle, err, 1)) == 0) { - snd_pcm_start(u->pcm_handle); + if ((r = try_recover(u, "snd_pcm_mmap_commit", err)) == 0) continue; - } + else if (r > 0) + return work_done; - pa_log("Failed to write data to DSP: %s", snd_strerror(err)); - return -1; + return r; } work_done = 1; u->frame_index += frames; - pa_log_debug("read %llu frames", (unsigned long long) frames); + pa_log_debug("read %lu frames", (unsigned long) frames); if (PA_LIKELY(frames >= (snd_pcm_uframes_t) n)) return work_done; @@ -245,31 +279,30 @@ static int mmap_read(struct userdata *u) { } static int unix_read(struct userdata *u) { - snd_pcm_status_t *status; int work_done = 0; - snd_pcm_status_alloca(&status); - pa_assert(u); pa_source_assert_ref(u->source); for (;;) { void *p; snd_pcm_sframes_t n, frames; - int err; + int r; pa_memchunk chunk; snd_pcm_hwsync(u->pcm_handle); - if (PA_UNLIKELY((err = snd_pcm_status(u->pcm_handle, status)) < 0)) { - pa_log("Failed to query DSP status data: %s", snd_strerror(err)); - return -1; - } + if (PA_UNLIKELY((n = snd_pcm_avail_update(u->pcm_handle)) < 0)) { + + if ((r = try_recover(u, "snd_pcm_avail_update", n)) == 0) + continue; + else if (r > 0) + return work_done; - if (PA_UNLIKELY(snd_pcm_status_get_avail_max(status)*u->frame_size >= u->hwbuf_size)) - pa_log_debug("Buffer overrun!"); + return r; + } - n = snd_pcm_status_get_avail(status); + check_left_to_record(u, n); if (PA_UNLIKELY(n <= 0)) return work_done; @@ -281,6 +314,8 @@ static int unix_read(struct userdata *u) { if (frames > n) frames = n; + pa_log_debug("%lu frames to read", (unsigned long) n); + p = pa_memblock_acquire(chunk.memblock); frames = snd_pcm_readi(u->pcm_handle, (uint8_t*) p, frames); pa_memblock_release(chunk.memblock); @@ -290,20 +325,12 @@ static int unix_read(struct userdata *u) { if (PA_UNLIKELY(frames < 0)) { pa_memblock_unref(chunk.memblock); - if (frames == -EAGAIN) { - pa_log_debug("EAGAIN"); - return work_done; - } - - if (frames == -EPIPE) - pa_log_debug("snd_pcm_avail_update: Buffer overrun!"); - - if ((frames = snd_pcm_recover(u->pcm_handle, frames, 1)) == 0) - snd_pcm_start(u->pcm_handle); + if ((r = try_recover(u, "snd_pcm_readi", n)) == 0) continue; + else if (r > 0) + return work_done; - pa_log("Failed to read data from DSP: %s", snd_strerror(frames)); - return -1; + return r; } chunk.index = 0; @@ -316,6 +343,8 @@ static int unix_read(struct userdata *u) { u->frame_index += frames; + pa_log_debug("read %lu frames", (unsigned long) frames); + if (PA_LIKELY(frames >= n)) return work_done; } @@ -344,46 +373,37 @@ static void update_smoother(struct userdata *u) { now1 = pa_rtclock_usec(); now2 = pa_bytes_to_usec(frames * u->frame_size, &u->source->sample_spec); + pa_smoother_put(u->smoother, now1, now2); } static pa_usec_t source_get_latency(struct userdata *u) { pa_usec_t r = 0; int64_t delay; + pa_usec_t now1, now2; pa_assert(u); - delay = pa_smoother_get(u->smoother, pa_rtclock_usec()) - u->frame_index; + now1 = pa_rtclock_usec(); + now2 = pa_smoother_get(u->smoother, now1); + + delay = (int64_t) now2 - pa_bytes_to_usec(u->frame_index * u->frame_size, &u->source->sample_spec); if (delay > 0) - r = pa_bytes_to_usec(delay * u->frame_size, &u->source->sample_spec); + r = (pa_usec_t) delay; return r; } static int build_pollfd(struct userdata *u) { - int err; - struct pollfd *pollfd; - int n; - pa_assert(u); pa_assert(u->pcm_handle); - if ((n = snd_pcm_poll_descriptors_count(u->pcm_handle)) < 0) { - pa_log("snd_pcm_poll_descriptors_count() failed: %s", snd_strerror(n)); - return -1; - } - if (u->alsa_rtpoll_item) pa_rtpoll_item_free(u->alsa_rtpoll_item); - u->alsa_rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, n); - pollfd = pa_rtpoll_item_get_pollfd(u->alsa_rtpoll_item, NULL); - - if ((err = snd_pcm_poll_descriptors(u->pcm_handle, pollfd, n)) < 0) { - pa_log("snd_pcm_poll_descriptors() failed: %s", snd_strerror(err)); + if (!(u->alsa_rtpoll_item = pa_alsa_build_pollfd(u->pcm_handle, u->rtpoll))) return -1; - } return 0; } @@ -418,7 +438,7 @@ static pa_usec_t hw_sleep_time(struct userdata *u) { if (usec == (pa_usec_t) -1) usec = pa_bytes_to_usec(u->hwbuf_size, &u->source->sample_spec); - pa_log_debug("hw buffer time: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); +/* pa_log_debug("hw buffer time: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); */ wm = pa_bytes_to_usec(u->tsched_watermark, &u->source->sample_spec); @@ -427,29 +447,55 @@ static pa_usec_t hw_sleep_time(struct userdata *u) { else usec /= 2; - pa_log_debug("after watermark: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); +/* pa_log_debug("after watermark: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); */ return usec; } static int update_sw_params(struct userdata *u) { - size_t avail_min; + snd_pcm_uframes_t avail_min; int err; pa_assert(u); + /* Use the full buffer if noone asked us for anything specific */ + u->hwbuf_unused_frames = 0; + if (u->use_tsched) { - pa_usec_t usec; + pa_usec_t latency; - usec = hw_sleep_time(u); + if ((latency = pa_source_get_requested_latency_within_thread(u->source)) != (pa_usec_t) -1) { + size_t b; - avail_min = pa_usec_to_bytes(usec, &u->source->sample_spec); + pa_log_debug("latency set to %0.2f", (double) latency / PA_USEC_PER_MSEC); - if (avail_min <= 0) - avail_min = 1; + b = pa_usec_to_bytes(latency, &u->source->sample_spec); - } else - avail_min = 1; + /* We need at least one sample in our buffer */ + + if (PA_UNLIKELY(b < u->frame_size)) + b = u->frame_size; + + u->hwbuf_unused_frames = + PA_LIKELY(b < u->hwbuf_size) ? + ((u->hwbuf_size - b) / u->frame_size) : 0; + + fix_tsched_watermark(u); + } + } + + pa_log_debug("hwbuf_unused_frames=%lu", (unsigned long) u->hwbuf_unused_frames); + + avail_min = 1; + + if (u->use_tsched) { + pa_usec_t usec; + + usec = hw_sleep_time(u); + avail_min += pa_usec_to_bytes(usec, &u->source->sample_spec); + } + + pa_log_debug("setting avail_min=%lu", (unsigned long) avail_min); if ((err = pa_alsa_set_sw_params(u->pcm_handle, avail_min)) < 0) { pa_log("Failed to set software parameters: %s", snd_strerror(err)); @@ -649,7 +695,6 @@ static int source_set_volume_cb(pa_source *s) { long alsa_vol; pa_volume_t vol; - pa_assert(snd_mixer_selem_has_capture_channel(u->mixer_elem, u->mixer_map[i])); vol = PA_MIN(s->volume.values[i], PA_VOLUME_NORM); @@ -670,7 +715,6 @@ static int source_set_volume_cb(pa_source *s) { u->hw_dB_supported = FALSE; } - alsa_vol = (long) roundf(((float) vol * (u->hw_volume_max - u->hw_volume_min)) / PA_VOLUME_NORM) + u->hw_volume_min; alsa_vol = PA_CLAMP_UNLIKELY(alsa_vol, u->hw_volume_min, u->hw_volume_max); @@ -745,6 +789,8 @@ static void thread_func(void *userdata) { for (;;) { int ret; + pa_log_debug("loop"); + /* Read some data and pass it to the sources */ if (PA_SOURCE_OPENED(u->source->thread_info.state)) { int work_done = 0; @@ -757,6 +803,8 @@ static void thread_func(void *userdata) { if (work_done < 0) goto fail; + pa_log_debug("work_done = %i", work_done); + if (work_done) update_smoother(u); @@ -806,8 +854,7 @@ static void thread_func(void *userdata) { } if (revents & (POLLERR|POLLNVAL|POLLHUP)) { - - if (pa_alsa_recover_from_poll(u->pcm_handle, revents)) + if (pa_alsa_recover_from_poll(u->pcm_handle, revents) < 0) goto fail; snd_pcm_start(u->pcm_handle); @@ -910,10 +957,9 @@ int pa__init(pa_module*m) { m->userdata = u; u->use_mmap = use_mmap; u->use_tsched = use_tsched; - pa_thread_mq_init(&u->thread_mq, m->core->mainloop); u->rtpoll = pa_rtpoll_new(); + pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll); u->alsa_rtpoll_item = NULL; - pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq); u->smoother = pa_smoother_new(DEFAULT_TSCHED_WATERMARK_USEC, DEFAULT_TSCHED_WATERMARK_USEC, TRUE); pa_smoother_set_time_offset(u->smoother, pa_rtclock_usec()); @@ -951,7 +997,7 @@ int pa__init(pa_module*m) { if (use_mmap && !b) { pa_log_info("Device doesn't support mmap(), falling back to UNIX read/write mode."); - u->use_mmap = use_mmap = b; + u->use_mmap = use_mmap = FALSE; } if (use_tsched && (!b || !d)) { @@ -1030,7 +1076,7 @@ int pa__init(pa_module*m) { pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, u->device_name); pa_proplist_setf(data.proplist, PA_PROP_DEVICE_BUFFERING_BUFFER_SIZE, "%lu", (unsigned long) (period_frames * frame_size * nfrags)); pa_proplist_setf(data.proplist, PA_PROP_DEVICE_BUFFERING_FRAGMENT_SIZE, "%lu", (unsigned long) (period_frames * frame_size)); - pa_proplist_sets(data.proplist, PA_PROP_DEVICE_ACCESS_MODE, u->use_tsched ? "mmap_rewrite" : (u->use_mmap ? "mmap" : "serial")); + pa_proplist_sets(data.proplist, PA_PROP_DEVICE_ACCESS_MODE, u->use_tsched ? "mmap+timer" : (u->use_mmap ? "mmap" : "serial")); u->source = pa_source_new(m->core, &data, PA_SOURCE_HARDWARE|PA_SOURCE_LATENCY); pa_source_new_data_done(&data); @@ -1052,14 +1098,17 @@ int pa__init(pa_module*m) { u->fragment_size = frag_size = period_frames * frame_size; u->nfragments = nfrags; u->hwbuf_size = u->fragment_size * nfrags; + u->hwbuf_unused_frames = 0; u->tsched_watermark = tsched_watermark; u->frame_index = 0; u->hw_dB_supported = FALSE; u->hw_dB_min = u->hw_dB_max = 0; u->hw_volume_min = u->hw_volume_max = 0; - u->source->max_latency = pa_bytes_to_usec(u->hwbuf_size, &ss); + if (use_tsched) + fix_tsched_watermark(u); + u->source->max_latency = pa_bytes_to_usec(u->hwbuf_size, &ss); if (!use_tsched) u->source->min_latency = u->source->max_latency; @@ -1095,7 +1144,7 @@ int pa__init(pa_module*m) { pa_log_info("Device has less than 4 volume levels. Falling back to software volume control."); suitable = FALSE; - } else if (snd_mixer_selem_get_playback_dB_range(u->mixer_elem, &u->hw_dB_min, &u->hw_dB_max) >= 0) { + } else if (snd_mixer_selem_get_capture_dB_range(u->mixer_elem, &u->hw_dB_min, &u->hw_dB_max) >= 0) { pa_log_info("Volume ranges from %0.2f dB to %0.2f dB.", u->hw_dB_min/100.0, u->hw_dB_max/100.0); @@ -1122,7 +1171,7 @@ int pa__init(pa_module*m) { u->source->get_volume = source_get_volume_cb; u->source->set_volume = source_set_volume_cb; u->source->flags |= PA_SOURCE_HW_VOLUME_CTRL | (u->hw_dB_supported ? PA_SOURCE_DECIBEL_VOLUME : 0); - pa_log_info("Using hardware volume control. %s dB scale.", u->hw_dB_supported ? "Using" : "Not using"); + pa_log_info("Using hardware volume control. Hardware dB scale %s.", u->hw_dB_supported ? "supported" : "not supported"); } else if (mixer_reset) { pa_log_info("Using software volume control. Trying to reset sound card to 0 dB."); diff --git a/src/modules/module-null-sink.c b/src/modules/module-null-sink.c index 6e59c62c..2301f088 100644 --- a/src/modules/module-null-sink.c +++ b/src/modules/module-null-sink.c @@ -188,9 +188,8 @@ int pa__init(pa_module*m) { u->core = m->core; u->module = m; m->userdata = u; - pa_thread_mq_init(&u->thread_mq, m->core->mainloop); u->rtpoll = pa_rtpoll_new(); - pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq); + pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll); pa_sink_new_data_init(&data); data.driver = __FILE__; diff --git a/src/modules/module-pipe-sink.c b/src/modules/module-pipe-sink.c index 73a52112..870b32ff 100644 --- a/src/modules/module-pipe-sink.c +++ b/src/modules/module-pipe-sink.c @@ -227,9 +227,8 @@ int pa__init(pa_module*m) { u->module = m; m->userdata = u; pa_memchunk_reset(&u->memchunk); - pa_thread_mq_init(&u->thread_mq, m->core->mainloop); u->rtpoll = pa_rtpoll_new(); - pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq); + pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll); u->filename = pa_xstrdup(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME)); diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c index cf88c823..5bf4da1f 100644 --- a/src/modules/module-pipe-source.c +++ b/src/modules/module-pipe-source.c @@ -204,9 +204,8 @@ int pa__init(pa_module*m) { u->module = m; m->userdata = u; pa_memchunk_reset(&u->memchunk); - pa_thread_mq_init(&u->thread_mq, m->core->mainloop); u->rtpoll = pa_rtpoll_new(); - pa_rtpoll_item_new_asyncmsgq(u->rtpoll, PA_RTPOLL_EARLY, u->thread_mq.inq); + pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll); u->filename = pa_xstrdup(pa_modargs_get_value(ma, "file", DEFAULT_FILE_NAME)); diff --git a/src/pulsecore/asyncmsgq.c b/src/pulsecore/asyncmsgq.c index 96b43a71..eba1c2cb 100644 --- a/src/pulsecore/asyncmsgq.c +++ b/src/pulsecore/asyncmsgq.c @@ -136,7 +136,7 @@ void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const vo /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */ pa_mutex_lock(a->mutex); - pa_assert_se(pa_asyncq_push(a->asyncq, i, 1) == 0); + pa_asyncq_post(a->asyncq, i); pa_mutex_unlock(a->mutex); } @@ -163,7 +163,7 @@ int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const voi /* Thus mutex makes the queue multiple-writer safe. This lock is only used on the writing side */ pa_mutex_lock(a->mutex); - pa_assert_se(pa_asyncq_push(a->asyncq, &i, 1) == 0); + pa_assert_se(pa_asyncq_push(a->asyncq, &i, TRUE) == 0); pa_mutex_unlock(a->mutex); pa_semaphore_wait(i.semaphore); @@ -174,7 +174,7 @@ int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const voi return i.ret; } -int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, int wait) { +int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, pa_bool_t wait) { pa_assert(PA_REFCNT_VALUE(a) > 0); pa_assert(!a->current); @@ -276,22 +276,40 @@ int pa_asyncmsgq_process_one(pa_asyncmsgq *a) { return 1; } -int pa_asyncmsgq_get_fd(pa_asyncmsgq *a) { +int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) { pa_assert(PA_REFCNT_VALUE(a) > 0); - return pa_asyncq_get_fd(a->asyncq); + return pa_asyncq_read_fd(a->asyncq); } -int pa_asyncmsgq_before_poll(pa_asyncmsgq *a) { +int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) { pa_assert(PA_REFCNT_VALUE(a) > 0); - return pa_asyncq_before_poll(a->asyncq); + return pa_asyncq_read_before_poll(a->asyncq); } -void pa_asyncmsgq_after_poll(pa_asyncmsgq *a) { +void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) { pa_assert(PA_REFCNT_VALUE(a) > 0); - pa_asyncq_after_poll(a->asyncq); + pa_asyncq_read_after_poll(a->asyncq); +} + +int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) { + pa_assert(PA_REFCNT_VALUE(a) > 0); + + return pa_asyncq_write_fd(a->asyncq); +} + +void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) { + pa_assert(PA_REFCNT_VALUE(a) > 0); + + pa_asyncq_write_before_poll(a->asyncq); +} + +void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) { + pa_assert(PA_REFCNT_VALUE(a) > 0); + + pa_asyncq_write_after_poll(a->asyncq); } int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) { diff --git a/src/pulsecore/asyncmsgq.h b/src/pulsecore/asyncmsgq.h index 575f760f..93f1ce86 100644 --- a/src/pulsecore/asyncmsgq.h +++ b/src/pulsecore/asyncmsgq.h @@ -62,15 +62,20 @@ void pa_asyncmsgq_unref(pa_asyncmsgq* q); void pa_asyncmsgq_post(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *memchunk, pa_free_cb_t userdata_free_cb); int pa_asyncmsgq_send(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *memchunk); -int pa_asyncmsgq_get(pa_asyncmsgq *q, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *memchunk, int wait); +int pa_asyncmsgq_get(pa_asyncmsgq *q, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *memchunk, pa_bool_t wait); int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk); void pa_asyncmsgq_done(pa_asyncmsgq *q, int ret); int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code); int pa_asyncmsgq_process_one(pa_asyncmsgq *a); -/* Just for the reading side */ -int pa_asyncmsgq_get_fd(pa_asyncmsgq *q); -int pa_asyncmsgq_before_poll(pa_asyncmsgq *a); -void pa_asyncmsgq_after_poll(pa_asyncmsgq *a); +/* For the reading side */ +int pa_asyncmsgq_read_fd(pa_asyncmsgq *q); +int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a); +void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a); + +/* For the write side */ +int pa_asyncmsgq_write_fd(pa_asyncmsgq *q); +void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a); +void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a); #endif diff --git a/src/pulsecore/asyncq.c b/src/pulsecore/asyncq.c index 75b15c0e..34506e49 100644 --- a/src/pulsecore/asyncq.c +++ b/src/pulsecore/asyncq.c @@ -33,6 +33,8 @@ #include <pulsecore/thread.h> #include <pulsecore/macro.h> #include <pulsecore/core-util.h> +#include <pulsecore/llist.h> +#include <pulsecore/flist.h> #include <pulse/xmalloc.h> #include "asyncq.h" @@ -51,13 +53,24 @@ #define _Y do { } while(0) #endif +struct localq { + void *data; + PA_LLIST_FIELDS(struct localq); +}; + struct pa_asyncq { unsigned size; unsigned read_idx; unsigned write_idx; pa_fdsem *read_fdsem, *write_fdsem; + + PA_LLIST_HEAD(struct localq, localq); + struct localq *last_localq; + pa_bool_t waiting_for_post; }; +PA_STATIC_FLIST_DECLARE(localq, 0, pa_xfree); + #define PA_ASYNCQ_CELLS(x) ((pa_atomic_ptr_t*) ((uint8_t*) (x) + PA_ALIGN(sizeof(struct pa_asyncq)))) static int is_power_of_two(unsigned size) { @@ -80,6 +93,10 @@ pa_asyncq *pa_asyncq_new(unsigned size) { l->size = size; + PA_LLIST_HEAD_INIT(struct localq, l->localq); + l->last_localq = NULL; + l->waiting_for_post = FALSE; + if (!(l->read_fdsem = pa_fdsem_new())) { pa_xfree(l); return NULL; @@ -95,6 +112,7 @@ pa_asyncq *pa_asyncq_new(unsigned size) { } void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) { + struct localq *q; pa_assert(l); if (free_cb) { @@ -104,12 +122,22 @@ void pa_asyncq_free(pa_asyncq *l, pa_free_cb_t free_cb) { free_cb(p); } + while ((q = l->localq)) { + if (free_cb) + free_cb(q->data); + + PA_LLIST_REMOVE(struct localq, l->localq, q); + + if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0) + pa_xfree(q); + } + pa_fdsem_free(l->read_fdsem); pa_fdsem_free(l->write_fdsem); pa_xfree(l); } -int pa_asyncq_push(pa_asyncq*l, void *p, int wait) { +static int push(pa_asyncq*l, void *p, pa_bool_t wait) { int idx; pa_atomic_ptr_t *cells; @@ -141,7 +169,63 @@ int pa_asyncq_push(pa_asyncq*l, void *p, int wait) { return 0; } -void* pa_asyncq_pop(pa_asyncq*l, int wait) { +static pa_bool_t flush_postq(pa_asyncq *l) { + struct localq *q; + + pa_assert(l); + + while ((q = l->last_localq)) { + + if (push(l, q->data, FALSE) < 0) + return FALSE; + + l->last_localq = q->prev; + + PA_LLIST_REMOVE(struct localq, l->localq, q); + + if (pa_flist_push(PA_STATIC_FLIST_GET(localq), q) < 0) + pa_xfree(q); + } + + return TRUE; +} + +int pa_asyncq_push(pa_asyncq*l, void *p, pa_bool_t wait) { + pa_assert(l); + + if (!flush_postq(l)) + return -1; + + return push(l, p, wait); +} + +void pa_asyncq_post(pa_asyncq*l, void *p) { + struct localq *q; + + pa_assert(l); + pa_assert(p); + + if (pa_asyncq_push(l, p, FALSE) >= 0) + return; + + /* OK, we couldn't push anything in the queue. So let's queue it + * locally and push it later */ + + pa_log("q overrun, queuing locally"); + + if (!(q = pa_flist_pop(PA_STATIC_FLIST_GET(localq)))) + q = pa_xnew(struct localq, 1); + + q->data = p; + PA_LLIST_PREPEND(struct localq, l->localq, q); + + if (!l->last_localq) + l->last_localq = q; + + return; +} + +void* pa_asyncq_pop(pa_asyncq*l, pa_bool_t wait) { int idx; void *ret; pa_atomic_ptr_t *cells; @@ -178,13 +262,13 @@ void* pa_asyncq_pop(pa_asyncq*l, int wait) { return ret; } -int pa_asyncq_get_fd(pa_asyncq *q) { +int pa_asyncq_read_fd(pa_asyncq *q) { pa_assert(q); return pa_fdsem_get(q->write_fdsem); } -int pa_asyncq_before_poll(pa_asyncq *l) { +int pa_asyncq_read_before_poll(pa_asyncq *l) { int idx; pa_atomic_ptr_t *cells; @@ -206,8 +290,38 @@ int pa_asyncq_before_poll(pa_asyncq *l) { return 0; } -void pa_asyncq_after_poll(pa_asyncq *l) { +void pa_asyncq_read_after_poll(pa_asyncq *l) { pa_assert(l); pa_fdsem_after_poll(l->write_fdsem); } + +int pa_asyncq_write_fd(pa_asyncq *q) { + pa_assert(q); + + return pa_fdsem_get(q->read_fdsem); +} + +void pa_asyncq_write_before_poll(pa_asyncq *l) { + pa_assert(l); + + for (;;) { + + if (flush_postq(l)) + break; + + if (pa_fdsem_before_poll(l->read_fdsem) >= 0) { + l->waiting_for_post = TRUE; + break; + } + } +} + +void pa_asyncq_write_after_poll(pa_asyncq *l) { + pa_assert(l); + + if (l->waiting_for_post) { + pa_fdsem_after_poll(l->read_fdsem); + l->waiting_for_post = FALSE; + } +} diff --git a/src/pulsecore/asyncq.h b/src/pulsecore/asyncq.h index 53d45866..4cdf8cd0 100644 --- a/src/pulsecore/asyncq.h +++ b/src/pulsecore/asyncq.h @@ -26,6 +26,7 @@ #include <sys/types.h> #include <pulse/def.h> +#include <pulsecore/macro.h> /* A simple, asynchronous, lock-free (if requested also wait-free) * queue. Not multiple-reader/multiple-writer safe. If that is @@ -46,11 +47,21 @@ typedef struct pa_asyncq pa_asyncq; pa_asyncq* pa_asyncq_new(unsigned size); void pa_asyncq_free(pa_asyncq* q, pa_free_cb_t free_cb); -void* pa_asyncq_pop(pa_asyncq *q, int wait); -int pa_asyncq_push(pa_asyncq *q, void *p, int wait); +void* pa_asyncq_pop(pa_asyncq *q, pa_bool_t wait); +int pa_asyncq_push(pa_asyncq *q, void *p, pa_bool_t wait); -int pa_asyncq_get_fd(pa_asyncq *q); -int pa_asyncq_before_poll(pa_asyncq *a); -void pa_asyncq_after_poll(pa_asyncq *a); +/* Similar to pa_asyncq_push(), but if the queue is full, postpone it + * locally and delay until pa_asyncq_before_poll_post() */ +void pa_asyncq_post(pa_asyncq*l, void *p); + +/* For the reading side */ +int pa_asyncq_read_fd(pa_asyncq *q); +int pa_asyncq_read_before_poll(pa_asyncq *a); +void pa_asyncq_read_after_poll(pa_asyncq *a); + +/* For the writing side */ +int pa_asyncq_write_fd(pa_asyncq *q); +void pa_asyncq_write_before_poll(pa_asyncq *a); +void pa_asyncq_write_after_poll(pa_asyncq *a); #endif diff --git a/src/pulsecore/rtpoll.c b/src/pulsecore/rtpoll.c index 734f344f..c3e76cac 100644 --- a/src/pulsecore/rtpoll.c +++ b/src/pulsecore/rtpoll.c @@ -661,23 +661,23 @@ pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio return i; } -static int asyncmsgq_before(pa_rtpoll_item *i) { +static int asyncmsgq_read_before(pa_rtpoll_item *i) { pa_assert(i); - if (pa_asyncmsgq_before_poll(i->userdata) < 0) + if (pa_asyncmsgq_read_before_poll(i->userdata) < 0) return 1; /* 1 means immediate restart of the loop */ return 0; } -static void asyncmsgq_after(pa_rtpoll_item *i) { +static void asyncmsgq_read_after(pa_rtpoll_item *i) { pa_assert(i); pa_assert((i->pollfd[0].revents & ~POLLIN) == 0); - pa_asyncmsgq_after_poll(i->userdata); + pa_asyncmsgq_read_after_poll(i->userdata); } -static int asyncmsgq_work(pa_rtpoll_item *i) { +static int asyncmsgq_read_work(pa_rtpoll_item *i) { pa_msgobject *object; int code; void *data; @@ -703,7 +703,7 @@ static int asyncmsgq_work(pa_rtpoll_item *i) { return 0; } -pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) { +pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) { pa_rtpoll_item *i; struct pollfd *pollfd; @@ -713,12 +713,47 @@ pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t i = pa_rtpoll_item_new(p, prio, 1); pollfd = pa_rtpoll_item_get_pollfd(i, NULL); - pollfd->fd = pa_asyncmsgq_get_fd(q); + pollfd->fd = pa_asyncmsgq_read_fd(q); pollfd->events = POLLIN; - i->before_cb = asyncmsgq_before; - i->after_cb = asyncmsgq_after; - i->work_cb = asyncmsgq_work; + i->before_cb = asyncmsgq_read_before; + i->after_cb = asyncmsgq_read_after; + i->work_cb = asyncmsgq_read_work; + i->userdata = q; + + return i; +} + +static int asyncmsgq_write_before(pa_rtpoll_item *i) { + pa_assert(i); + + pa_asyncmsgq_write_before_poll(i->userdata); + return 0; +} + +static void asyncmsgq_write_after(pa_rtpoll_item *i) { + pa_assert(i); + + pa_assert((i->pollfd[0].revents & ~POLLIN) == 0); + pa_asyncmsgq_write_after_poll(i->userdata); +} + +pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q) { + pa_rtpoll_item *i; + struct pollfd *pollfd; + + pa_assert(p); + pa_assert(q); + + i = pa_rtpoll_item_new(p, prio, 1); + + pollfd = pa_rtpoll_item_get_pollfd(i, NULL); + pollfd->fd = pa_asyncmsgq_write_fd(q); + pollfd->events = POLLIN; + + i->before_cb = asyncmsgq_write_before; + i->after_cb = asyncmsgq_write_after; + i->work_cb = NULL; i->userdata = q; return i; diff --git a/src/pulsecore/rtpoll.h b/src/pulsecore/rtpoll.h index f7f96e67..6d72eb54 100644 --- a/src/pulsecore/rtpoll.h +++ b/src/pulsecore/rtpoll.h @@ -106,7 +106,8 @@ void pa_rtpoll_item_set_userdata(pa_rtpoll_item *i, void *userdata); void* pa_rtpoll_item_get_userdata(pa_rtpoll_item *i); pa_rtpoll_item *pa_rtpoll_item_new_fdsem(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_fdsem *s); -pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q); +pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_read(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q); +pa_rtpoll_item *pa_rtpoll_item_new_asyncmsgq_write(pa_rtpoll *p, pa_rtpoll_priority_t prio, pa_asyncmsgq *q); /* Requests the loop to exit. Will cause the next iteration of * pa_rtpoll_run() to return 0 */ diff --git a/src/pulsecore/thread-mq.c b/src/pulsecore/thread-mq.c index 9b879425..7e39c577 100644 --- a/src/pulsecore/thread-mq.c +++ b/src/pulsecore/thread-mq.c @@ -43,15 +43,15 @@ PA_STATIC_TLS_DECLARE_NO_FREE(thread_mq); -static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) { +static void asyncmsgq_read_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) { pa_thread_mq *q = userdata; pa_asyncmsgq *aq; - pa_assert(pa_asyncmsgq_get_fd(q->outq) == fd); + pa_assert(pa_asyncmsgq_read_fd(q->outq) == fd); pa_assert(events == PA_IO_EVENT_INPUT); pa_asyncmsgq_ref(aq = q->outq); - pa_asyncmsgq_after_poll(aq); + pa_asyncmsgq_write_after_poll(aq); for (;;) { pa_msgobject *object; @@ -68,14 +68,24 @@ static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_even pa_asyncmsgq_done(aq, ret); } - if (pa_asyncmsgq_before_poll(aq) == 0) + if (pa_asyncmsgq_read_before_poll(aq) == 0) break; } pa_asyncmsgq_unref(aq); } -void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop) { +static void asyncmsgq_write_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) { + pa_thread_mq *q = userdata; + + pa_assert(pa_asyncmsgq_write_fd(q->inq) == fd); + pa_assert(events == PA_IO_EVENT_INPUT); + + pa_asyncmsgq_write_after_poll(q->inq); + pa_asyncmsgq_write_before_poll(q->inq); +} + +void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop, pa_rtpoll *rtpoll) { pa_assert(q); pa_assert(mainloop); @@ -83,15 +93,22 @@ void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop) { pa_assert_se(q->inq = pa_asyncmsgq_new(0)); pa_assert_se(q->outq = pa_asyncmsgq_new(0)); - pa_assert_se(pa_asyncmsgq_before_poll(q->outq) == 0); - pa_assert_se(q->io_event = mainloop->io_new(mainloop, pa_asyncmsgq_get_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_cb, q)); + pa_assert_se(pa_asyncmsgq_read_before_poll(q->outq) == 0); + pa_assert_se(q->read_event = mainloop->io_new(mainloop, pa_asyncmsgq_read_fd(q->outq), PA_IO_EVENT_INPUT, asyncmsgq_read_cb, q)); + + pa_asyncmsgq_write_before_poll(q->inq); + pa_assert_se(q->write_event = mainloop->io_new(mainloop, pa_asyncmsgq_write_fd(q->inq), PA_IO_EVENT_INPUT, asyncmsgq_write_cb, q)); + + pa_rtpoll_item_new_asyncmsgq_read(rtpoll, PA_RTPOLL_EARLY, q->inq); + pa_rtpoll_item_new_asyncmsgq_write(rtpoll, PA_RTPOLL_LATE, q->outq); } void pa_thread_mq_done(pa_thread_mq *q) { pa_assert(q); - q->mainloop->io_free(q->io_event); - q->io_event = NULL; + q->mainloop->io_free(q->read_event); + q->mainloop->io_free(q->write_event); + q->read_event = q->write_event = NULL; pa_asyncmsgq_unref(q->inq); pa_asyncmsgq_unref(q->outq); diff --git a/src/pulsecore/thread-mq.h b/src/pulsecore/thread-mq.h index 13b6e01f..0ae49f8c 100644 --- a/src/pulsecore/thread-mq.h +++ b/src/pulsecore/thread-mq.h @@ -26,6 +26,7 @@ #include <pulse/mainloop-api.h> #include <pulsecore/asyncmsgq.h> +#include <pulsecore/rtpoll.h> /* Two way communication between a thread and a mainloop. Before the * thread is started a pa_pthread_mq should be initialized and than @@ -34,10 +35,10 @@ typedef struct pa_thread_mq { pa_mainloop_api *mainloop; pa_asyncmsgq *inq, *outq; - pa_io_event *io_event; + pa_io_event *read_event, *write_event; } pa_thread_mq; -void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop); +void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop, pa_rtpoll *rtpoll); void pa_thread_mq_done(pa_thread_mq *q); /* Install the specified pa_thread_mq object for the current thread */ |