From 23039af8427140bafa92dce3021c1f2a14bcfbad Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Mon, 20 Jul 2009 15:49:33 +0100 Subject: client: allow zero-copy writing to the stream --- src/pulse/stream.c | 145 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 108 insertions(+), 37 deletions(-) (limited to 'src/pulse/stream.c') diff --git a/src/pulse/stream.c b/src/pulse/stream.c index 40556329..5baf5c2c 100644 --- a/src/pulse/stream.c +++ b/src/pulse/stream.c @@ -144,12 +144,13 @@ pa_stream *pa_stream_new_with_proplist( s->suspended = FALSE; s->corked = FALSE; + s->write_memblock = NULL; + s->write_data = NULL; + pa_memchunk_reset(&s->peek_memchunk); s->peek_data = NULL; - s->record_memblockq = NULL; - memset(&s->timing_info, 0, sizeof(s->timing_info)); s->timing_info_valid = FALSE; @@ -221,6 +222,11 @@ static void stream_free(pa_stream *s) { stream_unlink(s); + if (s->write_memblock) { + pa_memblock_release(s->write_memblock); + pa_memblock_unref(s->write_data); + } + if (s->peek_memchunk.memblock) { if (s->peek_data) pa_memblock_release(s->peek_memchunk.memblock); @@ -1187,20 +1193,60 @@ int pa_stream_connect_record( return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL); } +int pa_stream_begin_write( + pa_stream *s, + void **data, + size_t *nbytes) { + + pa_assert(s); + pa_assert(PA_REFCNT_VALUE(s) >= 1); + + PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED); + PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID); + PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID); + + if (!s->write_memblock) { + s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes); + s->write_data = pa_memblock_acquire(s->write_memblock); + } + + *data = s->write_data; + *nbytes = pa_memblock_get_length(s->write_memblock); + + return 0; +} + +int pa_stream_cancel_write( + pa_stream *s) { + + pa_assert(s); + pa_assert(PA_REFCNT_VALUE(s) >= 1); + + PA_CHECK_VALIDITY(s->context, !pa_detect_fork(), PA_ERR_FORKED); + PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, s->write_memblock, PA_ERR_BADSTATE); + + pa_assert(s->write_data); + + pa_memblock_release(s->write_memblock); + pa_memblock_unref(s->write_memblock); + s->write_memblock = NULL; + s->write_data = NULL; + + return 0; +} + int pa_stream_write( pa_stream *s, const void *data, size_t length, - void (*free_cb)(void *p), + pa_free_cb_t free_cb, int64_t offset, pa_seek_mode_t seek) { - pa_memchunk chunk; - pa_seek_mode_t t_seek; - int64_t t_offset; - size_t t_length; - const void *t_data; - pa_assert(s); pa_assert(PA_REFCNT_VALUE(s) >= 1); pa_assert(data); @@ -1210,46 +1256,71 @@ int pa_stream_write( PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID); + PA_CHECK_VALIDITY(s->context, + !s->write_memblock || + ((data >= s->write_data) && + ((const char*) data + length <= (const char*) s->write_data + pa_memblock_get_length(s->write_memblock))), + PA_ERR_INVALID); + PA_CHECK_VALIDITY(s->context, !free_cb || !s->write_memblock, PA_ERR_INVALID); - if (length <= 0) - return 0; + if (s->write_memblock) { + pa_memchunk chunk; - t_seek = seek; - t_offset = offset; - t_length = length; - t_data = data; + /* pa_stream_write_begin() was called before */ - while (t_length > 0) { + pa_memblock_release(s->write_memblock); - chunk.index = 0; + chunk.memblock = s->write_memblock; + chunk.index = (const char *) data - (const char *) s->write_data; + chunk.length = length; - if (free_cb && !pa_pstream_get_shm(s->context->pstream)) { - chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1); - chunk.length = t_length; - } else { - void *d; + s->write_memblock = NULL; + s->write_data = NULL; - chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool)); - chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length); + pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk); + pa_memblock_unref(chunk.memblock); - d = pa_memblock_acquire(chunk.memblock); - memcpy(d, t_data, chunk.length); - pa_memblock_release(chunk.memblock); - } + } else { + pa_seek_mode_t t_seek = seek; + int64_t t_offset = offset; + size_t t_length = length; + const void *t_data = data; - pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk); + /* pa_stream_write_begin() was not called before */ - t_offset = 0; - t_seek = PA_SEEK_RELATIVE; + while (t_length > 0) { + pa_memchunk chunk; - t_data = (const uint8_t*) t_data + chunk.length; - t_length -= chunk.length; + chunk.index = 0; - pa_memblock_unref(chunk.memblock); - } + if (free_cb && !pa_pstream_get_shm(s->context->pstream)) { + chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) t_data, t_length, free_cb, 1); + chunk.length = t_length; + } else { + void *d; + + chunk.length = PA_MIN(t_length, pa_mempool_block_size_max(s->context->mempool)); + chunk.memblock = pa_memblock_new(s->context->mempool, chunk.length); + + d = pa_memblock_acquire(chunk.memblock); + memcpy(d, t_data, chunk.length); + pa_memblock_release(chunk.memblock); + } - if (free_cb && pa_pstream_get_shm(s->context->pstream)) - free_cb((void*) data); + pa_pstream_send_memblock(s->context->pstream, s->channel, t_offset, t_seek, &chunk); + + t_offset = 0; + t_seek = PA_SEEK_RELATIVE; + + t_data = (const uint8_t*) t_data + chunk.length; + t_length -= chunk.length; + + pa_memblock_unref(chunk.memblock); + } + + if (free_cb && pa_pstream_get_shm(s->context->pstream)) + free_cb((void*) data); + } /* This is obviously wrong since we ignore the seeking index . But * that's OK, the server side applies the same error */ -- cgit From 5efb07281d5be1cddaed5b0610325fedd7f599ec Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Fri, 24 Jul 2009 20:13:52 +0200 Subject: alsa: throw timing data away after device resume --- src/pulse/stream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/pulse/stream.c') diff --git a/src/pulse/stream.c b/src/pulse/stream.c index 5baf5c2c..a22816ae 100644 --- a/src/pulse/stream.c +++ b/src/pulse/stream.c @@ -827,7 +827,7 @@ static void create_stream_complete(pa_stream *s) { if (s->flags & PA_STREAM_AUTO_TIMING_UPDATE) { s->auto_timing_interval_usec = AUTO_TIMING_INTERVAL_START_USEC; pa_assert(!s->auto_timing_update_event); - s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s); + s->auto_timing_update_event = pa_context_rttime_new(s->context, pa_rtclock_now() + s->auto_timing_interval_usec, &auto_timing_update_callback, s); request_auto_timing_update(s, TRUE); } -- cgit From e7ca058427e795b9c2b436c4a819e8d8354ee9de Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Fri, 24 Jul 2009 20:20:34 +0200 Subject: client: make volume struct const --- src/pulse/stream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/pulse/stream.c') diff --git a/src/pulse/stream.c b/src/pulse/stream.c index a22816ae..bbd01499 100644 --- a/src/pulse/stream.c +++ b/src/pulse/stream.c @@ -1172,7 +1172,7 @@ int pa_stream_connect_playback( const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags, - pa_cvolume *volume, + const pa_cvolume *volume, pa_stream *sync_stream) { pa_assert(s); -- cgit From 211d0f3dcb23c6d63dd6a212826a872ce972e4ee Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Fri, 24 Jul 2009 20:21:30 +0200 Subject: client: limit block size for zero-copy operations to mempool block size --- src/pulse/stream.c | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'src/pulse/stream.c') diff --git a/src/pulse/stream.c b/src/pulse/stream.c index bbd01499..72d49e11 100644 --- a/src/pulse/stream.c +++ b/src/pulse/stream.c @@ -1207,6 +1207,17 @@ int pa_stream_begin_write( PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID); + if (*nbytes != (size_t) -1) { + size_t m, fs; + + m = pa_mempool_block_size_max(s->context->mempool); + fs = pa_frame_size(&s->sample_spec); + + m = (m / fs) * fs; + if (*nbytes > m) + *nbytes = m; + } + if (!s->write_memblock) { s->write_memblock = pa_memblock_new(s->context->mempool, *nbytes); s->write_data = pa_memblock_acquire(s->write_memblock); -- cgit From d8a90a390041b5603a7caacaaea8296fa76363bc Mon Sep 17 00:00:00 2001 From: Marc-André Lureau Date: Mon, 20 Jul 2009 13:53:17 +0300 Subject: pulse: even in case of record stream, let's initialize req_bytes to 0 --- src/pulse/stream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/pulse/stream.c') diff --git a/src/pulse/stream.c b/src/pulse/stream.c index 72d49e11..2bc2b1e4 100644 --- a/src/pulse/stream.c +++ b/src/pulse/stream.c @@ -867,7 +867,7 @@ static void automatic_buffer_attr(pa_stream *s, pa_buffer_attr *attr, const pa_s void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { pa_stream *s = userdata; - uint32_t requested_bytes; + uint32_t requested_bytes = 0; pa_assert(pd); pa_assert(s); -- cgit