From 45700da4ebf2986126d7a4d6889c028599fd3085 Mon Sep 17 00:00:00 2001 From: Pierre Ossman Date: Fri, 17 Feb 2006 15:42:47 +0000 Subject: Have a memblock queue on the client side during recording. This makes the record callback optional in stead of mandatory. For applications that wish to retain the old behaviour, simply call pa_stream_peek() followed by pa_stream_drop() in the callback. git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@507 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/Makefile.am | 1 + src/polyp/context.c | 10 ++++----- src/polyp/internal.h | 5 ++++- src/polyp/stream.c | 61 +++++++++++++++++++++++++++++++++++++++++++++++++++- src/polyp/stream.h | 22 +++++++++++++++++-- src/utils/pacat.c | 10 +++++++-- 6 files changed, 98 insertions(+), 11 deletions(-) diff --git a/src/Makefile.am b/src/Makefile.am index 1be713ce..f973dc46 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -330,6 +330,7 @@ libpolyp_@PA_MAJORMINOR@_la_SOURCES += \ polypcore/log.c polypcore/log.h \ polypcore/mcalign.c polypcore/mcalign.h \ polypcore/memblock.c polypcore/memblock.h \ + polypcore/memblockq.c polypcore/memblockq.h \ polypcore/memchunk.c polypcore/memchunk.h \ polypcore/native-common.h \ polypcore/packet.c polypcore/packet.h \ diff --git a/src/polyp/context.c b/src/polyp/context.c index b7f7eb99..c40041c5 100644 --- a/src/polyp/context.c +++ b/src/polyp/context.c @@ -273,11 +273,11 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, PA_GCC_UN if (pa_mcalign_pop(s->mcalign, &t) < 0) break; - - if (s->read_callback) { - s->read_callback(s, (uint8_t*) t.memblock->data + t.index, t.length, s->read_userdata); - s->counter += chunk->length; - } + + assert(s->record_memblockq); + pa_memblockq_push(s->record_memblockq, &t, t.length); + if (s->read_callback) + s->read_callback(s, pa_stream_readable_size(s), s->read_userdata); pa_memblock_unref(t.memblock); } diff --git a/src/polyp/internal.h b/src/polyp/internal.h index 762688c9..7f4d38ac 100644 --- a/src/polyp/internal.h +++ b/src/polyp/internal.h @@ -35,6 +35,7 @@ #include #include #include +#include #include "client-conf.h" @@ -98,6 +99,8 @@ struct pa_stream { pa_usec_t previous_ipol_time; pa_stream_state_t state; pa_mcalign *mcalign; + pa_memchunk peek_memchunk; + pa_memblockq *record_memblockq; int interpolate; int corked; @@ -110,7 +113,7 @@ struct pa_stream { void (*state_callback)(pa_stream*c, void *userdata); void *state_userdata; - void (*read_callback)(pa_stream *p, const void*data, size_t length, void *userdata); + void (*read_callback)(pa_stream *p, size_t length, void *userdata); void *read_userdata; void (*write_callback)(pa_stream *p, size_t length, void *userdata); diff --git a/src/polyp/stream.c b/src/polyp/stream.c index 6e5bc067..fef3c00f 100644 --- a/src/polyp/stream.c +++ b/src/polyp/stream.c @@ -103,6 +103,12 @@ static void stream_free(pa_stream *s) { s->mainloop->time_free(s->ipol_event); } + if (s->peek_memchunk.memblock) + pa_memblock_unref(s->peek_memchunk.memblock); + + if (s->record_memblockq) + pa_memblockq_free(s->record_memblockq); + pa_mcalign_free(s->mcalign); pa_xfree(s->name); @@ -263,6 +269,13 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED goto finish; } + if (s->direction == PA_STREAM_RECORD) { + assert(!s->record_memblockq); + s->record_memblockq = pa_memblockq_new(s->buffer_attr.maxlength, 0, + pa_frame_size(&s->sample_spec), 0, 0, s->context->memblock_stat); + assert(s->record_memblockq); + } + s->channel_valid = 1; pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s); pa_stream_set_state(s, PA_STREAM_READY); @@ -391,11 +404,57 @@ void pa_stream_write(pa_stream *s, const void *data, size_t length, void (*free_ s->counter += length; } +void pa_stream_peek(pa_stream *s, void **data, size_t *length) { + assert(s && s->record_memblockq && data && length && s->state == PA_STREAM_READY && s->ref >= 1); + + if (!s->peek_memchunk.memblock) { + *data = NULL; + *length = 0; + + if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) + return; + + pa_memblockq_drop(s->record_memblockq, &s->peek_memchunk, s->peek_memchunk.length); + } + + *data = (char*)s->peek_memchunk.memblock->data + s->peek_memchunk.index; + *length = s->peek_memchunk.length; +} + +void pa_stream_drop(pa_stream *s) { + assert(s && s->peek_memchunk.memblock && s->state == PA_STREAM_READY && s->ref >= 1); + + s->counter += s->peek_memchunk.length; + + pa_memblock_unref(s->peek_memchunk.memblock); + + s->peek_memchunk.length = 0; + s->peek_memchunk.memblock = NULL; +} + size_t pa_stream_writable_size(pa_stream *s) { assert(s && s->ref >= 1); return s->state == PA_STREAM_READY ? s->requested_bytes : 0; } +size_t pa_stream_readable_size(pa_stream *s) { + size_t sz; + + assert(s && s->ref >= 1); + + if (s->state != PA_STREAM_READY) + return 0; + + assert(s->record_memblockq); + + sz = (size_t)pa_memblockq_get_length(s->record_memblockq); + + if (s->peek_memchunk.memblock) + sz += s->peek_memchunk.length; + + return sz; +} + pa_operation * pa_stream_drain(pa_stream *s, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata) { pa_operation *o; pa_tagstruct *t; @@ -554,7 +613,7 @@ void pa_stream_disconnect(pa_stream *s) { pa_stream_unref(s); } -void pa_stream_set_read_callback(pa_stream *s, void (*cb)(pa_stream *p, const void*data, size_t length, void *userdata), void *userdata) { +void pa_stream_set_read_callback(pa_stream *s, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata) { assert(s && s->ref >= 1); s->read_callback = cb; s->read_userdata = userdata; diff --git a/src/polyp/stream.h b/src/polyp/stream.h index e20cfdd4..9bbda436 100644 --- a/src/polyp/stream.h +++ b/src/polyp/stream.h @@ -106,9 +106,25 @@ void pa_stream_write(pa_stream *p /**< The stream to use */, value is ignored on upload streams. */); +/** Read the next fragment from the buffer (for capture sources). + * data will point to the actual data and length will contain the size + * of the data in bytes (which can be less than a complete framgnet). + * Use pa_stream_drop() to actually remove the data from the buffer. + * \since 0.8 */ +void pa_stream_peek(pa_stream *p /**< The stream to use */, + void **data /**< Pointer to pointer that will point to data */, + size_t *length /**< The length of the data read */); + +/** Remove the current fragment. It is invalid to do this without first + * calling pa_stream_peek(). \since 0.8 */ +void pa_stream_drop(pa_stream *p); + /** Return the amount of bytes that may be written using pa_stream_write() */ size_t pa_stream_writable_size(pa_stream *p); +/** Return the ammount of bytes that may be read using pa_stream_read() \since 0.8 */ +size_t pa_stream_readable_size(pa_stream *p); + /** Drain a playback stream */ pa_operation* pa_stream_drain(pa_stream *s, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata); @@ -122,8 +138,10 @@ void pa_stream_set_state_callback(pa_stream *s, void (*cb)(pa_stream *s, void *u * written to the stream. */ void pa_stream_set_write_callback(pa_stream *p, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata); -/** Set the callback function that is called when new data is available from the stream */ -void pa_stream_set_read_callback(pa_stream *p, void (*cb)(pa_stream *p, const void*data, size_t length, void *userdata), void *userdata); +/** Set the callback function that is called when new data is available from the stream. + * Return the number of bytes read. \since 0.8 + */ +void pa_stream_set_read_callback(pa_stream *p, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata); /** Pause (or resume) playback of this stream temporarily. Available on both playback and recording streams. \since 0.3 */ pa_operation* pa_stream_cork(pa_stream *s, int b, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata); diff --git a/src/utils/pacat.c b/src/utils/pacat.c index 3c50402f..a3c3f2c8 100644 --- a/src/utils/pacat.c +++ b/src/utils/pacat.c @@ -105,14 +105,19 @@ static void stream_write_callback(pa_stream *s, size_t length, void *userdata) { } /* This is called whenever new data may is available */ -static void stream_read_callback(pa_stream *s, const void*data, size_t length, void *userdata) { - assert(s && data && length); +static void stream_read_callback(pa_stream *s, size_t length, void *userdata) { + assert(s && length); + void *data; if (stdio_event) mainloop_api->io_enable(stdio_event, PA_IO_EVENT_OUTPUT); + pa_stream_peek(s, &data, &length); + assert(data && length); + if (buffer) { fprintf(stderr, "Buffer overrun, dropping incoming data\n"); + pa_stream_drop(s); return; } @@ -120,6 +125,7 @@ static void stream_read_callback(pa_stream *s, const void*data, size_t length, v assert(buffer); memcpy(buffer, data, length); buffer_index = 0; + pa_stream_drop(s); } /* This routine is called whenever the stream state changes */ -- cgit