summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am1
-rw-r--r--src/polyp/context.c10
-rw-r--r--src/polyp/internal.h5
-rw-r--r--src/polyp/stream.c61
-rw-r--r--src/polyp/stream.h22
-rw-r--r--src/utils/pacat.c10
6 files changed, 98 insertions, 11 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 1be713ce..f973dc46 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -330,6 +330,7 @@ libpolyp_@PA_MAJORMINOR@_la_SOURCES += \
polypcore/log.c polypcore/log.h \
polypcore/mcalign.c polypcore/mcalign.h \
polypcore/memblock.c polypcore/memblock.h \
+ polypcore/memblockq.c polypcore/memblockq.h \
polypcore/memchunk.c polypcore/memchunk.h \
polypcore/native-common.h \
polypcore/packet.c polypcore/packet.h \
diff --git a/src/polyp/context.c b/src/polyp/context.c
index b7f7eb99..c40041c5 100644
--- a/src/polyp/context.c
+++ b/src/polyp/context.c
@@ -273,11 +273,11 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, PA_GCC_UN
if (pa_mcalign_pop(s->mcalign, &t) < 0)
break;
-
- if (s->read_callback) {
- s->read_callback(s, (uint8_t*) t.memblock->data + t.index, t.length, s->read_userdata);
- s->counter += chunk->length;
- }
+
+ assert(s->record_memblockq);
+ pa_memblockq_push(s->record_memblockq, &t, t.length);
+ if (s->read_callback)
+ s->read_callback(s, pa_stream_readable_size(s), s->read_userdata);
pa_memblock_unref(t.memblock);
}
diff --git a/src/polyp/internal.h b/src/polyp/internal.h
index 762688c9..7f4d38ac 100644
--- a/src/polyp/internal.h
+++ b/src/polyp/internal.h
@@ -35,6 +35,7 @@
#include <polypcore/native-common.h>
#include <polypcore/strlist.h>
#include <polypcore/mcalign.h>
+#include <polypcore/memblockq.h>
#include "client-conf.h"
@@ -98,6 +99,8 @@ struct pa_stream {
pa_usec_t previous_ipol_time;
pa_stream_state_t state;
pa_mcalign *mcalign;
+ pa_memchunk peek_memchunk;
+ pa_memblockq *record_memblockq;
int interpolate;
int corked;
@@ -110,7 +113,7 @@ struct pa_stream {
void (*state_callback)(pa_stream*c, void *userdata);
void *state_userdata;
- void (*read_callback)(pa_stream *p, const void*data, size_t length, void *userdata);
+ void (*read_callback)(pa_stream *p, size_t length, void *userdata);
void *read_userdata;
void (*write_callback)(pa_stream *p, size_t length, void *userdata);
diff --git a/src/polyp/stream.c b/src/polyp/stream.c
index 6e5bc067..fef3c00f 100644
--- a/src/polyp/stream.c
+++ b/src/polyp/stream.c
@@ -103,6 +103,12 @@ static void stream_free(pa_stream *s) {
s->mainloop->time_free(s->ipol_event);
}
+ if (s->peek_memchunk.memblock)
+ pa_memblock_unref(s->peek_memchunk.memblock);
+
+ if (s->record_memblockq)
+ pa_memblockq_free(s->record_memblockq);
+
pa_mcalign_free(s->mcalign);
pa_xfree(s->name);
@@ -263,6 +269,13 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED
goto finish;
}
+ if (s->direction == PA_STREAM_RECORD) {
+ assert(!s->record_memblockq);
+ s->record_memblockq = pa_memblockq_new(s->buffer_attr.maxlength, 0,
+ pa_frame_size(&s->sample_spec), 0, 0, s->context->memblock_stat);
+ assert(s->record_memblockq);
+ }
+
s->channel_valid = 1;
pa_dynarray_put((s->direction == PA_STREAM_RECORD) ? s->context->record_streams : s->context->playback_streams, s->channel, s);
pa_stream_set_state(s, PA_STREAM_READY);
@@ -391,11 +404,57 @@ void pa_stream_write(pa_stream *s, const void *data, size_t length, void (*free_
s->counter += length;
}
+void pa_stream_peek(pa_stream *s, void **data, size_t *length) {
+ assert(s && s->record_memblockq && data && length && s->state == PA_STREAM_READY && s->ref >= 1);
+
+ if (!s->peek_memchunk.memblock) {
+ *data = NULL;
+ *length = 0;
+
+ if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0)
+ return;
+
+ pa_memblockq_drop(s->record_memblockq, &s->peek_memchunk, s->peek_memchunk.length);
+ }
+
+ *data = (char*)s->peek_memchunk.memblock->data + s->peek_memchunk.index;
+ *length = s->peek_memchunk.length;
+}
+
+void pa_stream_drop(pa_stream *s) {
+ assert(s && s->peek_memchunk.memblock && s->state == PA_STREAM_READY && s->ref >= 1);
+
+ s->counter += s->peek_memchunk.length;
+
+ pa_memblock_unref(s->peek_memchunk.memblock);
+
+ s->peek_memchunk.length = 0;
+ s->peek_memchunk.memblock = NULL;
+}
+
size_t pa_stream_writable_size(pa_stream *s) {
assert(s && s->ref >= 1);
return s->state == PA_STREAM_READY ? s->requested_bytes : 0;
}
+size_t pa_stream_readable_size(pa_stream *s) {
+ size_t sz;
+
+ assert(s && s->ref >= 1);
+
+ if (s->state != PA_STREAM_READY)
+ return 0;
+
+ assert(s->record_memblockq);
+
+ sz = (size_t)pa_memblockq_get_length(s->record_memblockq);
+
+ if (s->peek_memchunk.memblock)
+ sz += s->peek_memchunk.length;
+
+ return sz;
+}
+
pa_operation * pa_stream_drain(pa_stream *s, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata) {
pa_operation *o;
pa_tagstruct *t;
@@ -554,7 +613,7 @@ void pa_stream_disconnect(pa_stream *s) {
pa_stream_unref(s);
}
-void pa_stream_set_read_callback(pa_stream *s, void (*cb)(pa_stream *p, const void*data, size_t length, void *userdata), void *userdata) {
+void pa_stream_set_read_callback(pa_stream *s, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata) {
assert(s && s->ref >= 1);
s->read_callback = cb;
s->read_userdata = userdata;
diff --git a/src/polyp/stream.h b/src/polyp/stream.h
index e20cfdd4..9bbda436 100644
--- a/src/polyp/stream.h
+++ b/src/polyp/stream.h
@@ -106,9 +106,25 @@ void pa_stream_write(pa_stream *p /**< The stream to use */,
value is ignored on
upload streams. */);
+/** Read the next fragment from the buffer (for capture sources).
+ * data will point to the actual data and length will contain the size
+ * of the data in bytes (which can be less than a complete framgnet).
+ * Use pa_stream_drop() to actually remove the data from the buffer.
+ * \since 0.8 */
+void pa_stream_peek(pa_stream *p /**< The stream to use */,
+ void **data /**< Pointer to pointer that will point to data */,
+ size_t *length /**< The length of the data read */);
+
+/** Remove the current fragment. It is invalid to do this without first
+ * calling pa_stream_peek(). \since 0.8 */
+void pa_stream_drop(pa_stream *p);
+
/** Return the amount of bytes that may be written using pa_stream_write() */
size_t pa_stream_writable_size(pa_stream *p);
+/** Return the ammount of bytes that may be read using pa_stream_read() \since 0.8 */
+size_t pa_stream_readable_size(pa_stream *p);
+
/** Drain a playback stream */
pa_operation* pa_stream_drain(pa_stream *s, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata);
@@ -122,8 +138,10 @@ void pa_stream_set_state_callback(pa_stream *s, void (*cb)(pa_stream *s, void *u
* written to the stream. */
void pa_stream_set_write_callback(pa_stream *p, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata);
-/** Set the callback function that is called when new data is available from the stream */
-void pa_stream_set_read_callback(pa_stream *p, void (*cb)(pa_stream *p, const void*data, size_t length, void *userdata), void *userdata);
+/** Set the callback function that is called when new data is available from the stream.
+ * Return the number of bytes read. \since 0.8
+ */
+void pa_stream_set_read_callback(pa_stream *p, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata);
/** Pause (or resume) playback of this stream temporarily. Available on both playback and recording streams. \since 0.3 */
pa_operation* pa_stream_cork(pa_stream *s, int b, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata);
diff --git a/src/utils/pacat.c b/src/utils/pacat.c
index 3c50402f..a3c3f2c8 100644
--- a/src/utils/pacat.c
+++ b/src/utils/pacat.c
@@ -105,14 +105,19 @@ static void stream_write_callback(pa_stream *s, size_t length, void *userdata) {
}
/* This is called whenever new data may is available */
-static void stream_read_callback(pa_stream *s, const void*data, size_t length, void *userdata) {
- assert(s && data && length);
+static void stream_read_callback(pa_stream *s, size_t length, void *userdata) {
+ assert(s && length);
+ void *data;
if (stdio_event)
mainloop_api->io_enable(stdio_event, PA_IO_EVENT_OUTPUT);
+ pa_stream_peek(s, &data, &length);
+ assert(data && length);
+
if (buffer) {
fprintf(stderr, "Buffer overrun, dropping incoming data\n");
+ pa_stream_drop(s);
return;
}
@@ -120,6 +125,7 @@ static void stream_read_callback(pa_stream *s, const void*data, size_t length, v
assert(buffer);
memcpy(buffer, data, length);
buffer_index = 0;
+ pa_stream_drop(s);
}
/* This routine is called whenever the stream state changes */