summaryrefslogtreecommitdiffstats
path: root/src/polyp/stream.c
diff options
context:
space:
mode:
authorLennart Poettering <lennart@poettering.net>2006-02-20 04:05:16 +0000
committerLennart Poettering <lennart@poettering.net>2006-02-20 04:05:16 +0000
commit304449002cbc84fdcf235b5dfaec891278dd7085 (patch)
tree2a2d00e34d5c620835b76a0d6f7890a1d3e9fb97 /src/polyp/stream.c
parent0876b1ba82ea9c988df90ca98d202765ac697313 (diff)
1) Add flexible seeking support (including absolute) for memory block queues and playback streams
2) Add support to synchronize multiple playback streams 3) add two tests for 1) and 2) 4) s/PA_ERROR/PA_ERR/ 5) s/PA_ERROR_OK/PA_OK/ 6) update simple API to deal properly with new peek/drop recording API 7) add beginnings of proper validity checking on API calls in client libs (needs to be extended) 8) report playback buffer overflows/underflows to the client 9) move client side recording mcalign stuff into the memblockq 10) create typedefs for a bunch of API callback prototypes 11) simplify handling of HUP poll() events Yes, i know, it's usually better to commit a lot of small patches instead of a single big one. In this case however, this would have contradicted the other rule: never commit broken or incomplete stuff. *** This stuff needs a lot of additional testing! *** git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@511 fefdeb5f-60dc-0310-8127-8f9354f1896f
Diffstat (limited to 'src/polyp/stream.c')
-rw-r--r--src/polyp/stream.c366
1 files changed, 258 insertions, 108 deletions
diff --git a/src/polyp/stream.c b/src/polyp/stream.c
index 35de2d01..5497f0c4 100644
--- a/src/polyp/stream.c
+++ b/src/polyp/stream.c
@@ -28,6 +28,7 @@
#include <stdio.h>
#include <string.h>
+#include <polyp/def.h>
#include <polypcore/xmalloc.h>
#include <polypcore/pstream-util.h>
#include <polypcore/util.h>
@@ -39,14 +40,11 @@
pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) {
pa_stream *s;
+
assert(c);
- assert(ss);
-
- if (!pa_sample_spec_valid(ss))
- return NULL;
- if (map && !pa_channel_map_valid(map))
- return NULL;
+ PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID);
+ PA_CHECK_VALIDITY_RETURN_NULL(c, !map || pa_channel_map_valid(map), PA_ERR_INVALID);
s = pa_xnew(pa_stream, 1);
s->ref = 1;
@@ -59,6 +57,10 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *
s->write_userdata = NULL;
s->state_callback = NULL;
s->state_userdata = NULL;
+ s->overflow_callback = NULL;
+ s->overflow_userdata = NULL;
+ s->underflow_callback = NULL;
+ s->underflow_userdata = NULL;
s->direction = PA_STREAM_NODIRECTION;
s->name = pa_xstrdup(name);
@@ -71,13 +73,13 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *
s->channel = 0;
s->channel_valid = 0;
+ s->syncid = c->csyncid++;
s->device_index = PA_INVALID_INDEX;
s->requested_bytes = 0;
- s->state = PA_STREAM_DISCONNECTED;
+ s->state = PA_STREAM_UNCONNECTED;
memset(&s->buffer_attr, 0, sizeof(s->buffer_attr));
- s->mcalign = pa_mcalign_new(pa_frame_size(ss), c->memblock_stat);
-
+ s->peek_memchunk.index = 0;
s->peek_memchunk.length = 0;
s->peek_memchunk.memblock = NULL;
@@ -114,42 +116,52 @@ static void stream_free(pa_stream *s) {
if (s->record_memblockq)
pa_memblockq_free(s->record_memblockq);
- pa_mcalign_free(s->mcalign);
-
pa_xfree(s->name);
pa_xfree(s);
}
void pa_stream_unref(pa_stream *s) {
- assert(s && s->ref >= 1);
+ assert(s);
+ assert(s->ref >= 1);
if (--(s->ref) == 0)
stream_free(s);
}
pa_stream* pa_stream_ref(pa_stream *s) {
- assert(s && s->ref >= 1);
+ assert(s);
+ assert(s->ref >= 1);
+
s->ref++;
return s;
}
pa_stream_state_t pa_stream_get_state(pa_stream *s) {
- assert(s && s->ref >= 1);
+ assert(s);
+ assert(s->ref >= 1);
+
return s->state;
}
pa_context* pa_stream_get_context(pa_stream *s) {
- assert(s && s->ref >= 1);
+ assert(s);
+ assert(s->ref >= 1);
+
return s->context;
}
uint32_t pa_stream_get_index(pa_stream *s) {
- assert(s && s->ref >= 1);
+ assert(s);
+ assert(s->ref >= 1);
+
+ PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX);
+
return s->device_index;
}
void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
- assert(s && s->ref >= 1);
+ assert(s);
+ assert(s->ref >= 1);
if (s->state == st)
return;
@@ -159,6 +171,8 @@ void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) {
s->state = st;
if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED) && s->context) {
+ /* Detach from context */
+
if (s->channel_valid)
pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL);
@@ -182,14 +196,14 @@ void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED
if (pa_tagstruct_getu32(t, &channel) < 0 ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(c, PA_ERROR_PROTOCOL);
+ pa_context_fail(c, PA_ERR_PROTOCOL);
goto finish;
}
if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel)))
goto finish;
- c->error = PA_ERROR_KILLED;
+ c->error = PA_ERR_KILLED;
pa_stream_set_state(s, PA_STREAM_FAILED);
finish:
@@ -207,24 +221,55 @@ void pa_command_request(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32
if (pa_tagstruct_getu32(t, &channel) < 0 ||
pa_tagstruct_getu32(t, &bytes) < 0 ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(c, PA_ERROR_PROTOCOL);
+ pa_context_fail(c, PA_ERR_PROTOCOL);
goto finish;
}
if (!(s = pa_dynarray_get(c->playback_streams, channel)))
goto finish;
- if (s->state != PA_STREAM_READY)
- goto finish;
+ if (s->state == PA_STREAM_READY) {
+ s->requested_bytes += bytes;
+
+ if (s->requested_bytes > 0 && s->write_callback)
+ s->write_callback(s, s->requested_bytes, s->write_userdata);
+ }
- pa_stream_ref(s);
+finish:
+ pa_context_unref(c);
+}
+
+void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
+ pa_stream *s;
+ pa_context *c = userdata;
+ uint32_t channel;
+
+ assert(pd);
+ assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW);
+ assert(t);
+ assert(c);
+
+ pa_context_ref(c);
+
+ if (pa_tagstruct_getu32(t, &channel) < 0 ||
+ !pa_tagstruct_eof(t)) {
+ pa_context_fail(c, PA_ERR_PROTOCOL);
+ goto finish;
+ }
- s->requested_bytes += bytes;
+ if (!(s = pa_dynarray_get(c->playback_streams, channel)))
+ goto finish;
- if (s->requested_bytes && s->write_callback)
- s->write_callback(s, s->requested_bytes, s->write_userdata);
+ if (s->state == PA_STREAM_READY) {
- pa_stream_unref(s);
+ if (command == PA_COMMAND_OVERFLOW) {
+ if (s->overflow_callback)
+ s->overflow_callback(s, s->overflow_userdata);
+ } else if (command == PA_COMMAND_UNDERFLOW) {
+ if (s->underflow_callback)
+ s->underflow_callback(s, s->underflow_userdata);
+ }
+ }
finish:
pa_context_unref(c);
@@ -270,14 +315,21 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED
((s->direction != PA_STREAM_UPLOAD) && pa_tagstruct_getu32(t, &s->device_index) < 0) ||
((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0) ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(s->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(s->context, PA_ERR_PROTOCOL);
goto finish;
}
if (s->direction == PA_STREAM_RECORD) {
assert(!s->record_memblockq);
- s->record_memblockq = pa_memblockq_new(s->buffer_attr.maxlength, 0,
- pa_frame_size(&s->sample_spec), 0, 0, s->context->memblock_stat);
+ s->record_memblockq = pa_memblockq_new(
+ 0,
+ s->buffer_attr.maxlength,
+ 0,
+ pa_frame_size(&s->sample_spec),
+ 1,
+ 0,
+ NULL,
+ s->context->memblock_stat);
assert(s->record_memblockq);
}
@@ -303,13 +355,32 @@ finish:
pa_stream_unref(s);
}
-static void create_stream(pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags, const pa_cvolume *volume) {
+static int create_stream(
+ pa_stream_direction_t direction,
+ pa_stream *s,
+ const char *dev,
+ const pa_buffer_attr *attr,
+ pa_stream_flags_t flags,
+ const pa_cvolume *volume,
+ pa_stream *sync_stream) {
+
pa_tagstruct *t;
uint32_t tag;
- assert(s && s->ref >= 1 && s->state == PA_STREAM_DISCONNECTED);
+
+ assert(s);
+ assert(s->ref >= 1);
+
+ PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, (flags & ~(PA_STREAM_START_CORKED|PA_STREAM_INTERPOLATE_LATENCY)) == 0, PA_ERR_INVALID);
+ PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || flags == 0, PA_ERR_INVALID);
pa_stream_ref(s);
+ s->direction = direction;
+
+ if (sync_stream)
+ s->syncid = sync_stream->syncid;
+
s->interpolate = !!(flags & PA_STREAM_INTERPOLATE_LATENCY);
pa_stream_trash_ipol(s);
@@ -336,25 +407,28 @@ static void create_stream(pa_stream *s, const char *dev, const pa_buffer_attr *a
dev = s->context->conf->default_source;
}
- pa_tagstruct_put(t,
- PA_TAG_U32, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM,
- PA_TAG_U32, tag = s->context->ctag++,
- PA_TAG_STRING, s->name,
- PA_TAG_SAMPLE_SPEC, &s->sample_spec,
- PA_TAG_CHANNEL_MAP, &s->channel_map,
- PA_TAG_U32, PA_INVALID_INDEX,
- PA_TAG_STRING, dev,
- PA_TAG_U32, s->buffer_attr.maxlength,
- PA_TAG_BOOLEAN, !!(flags & PA_STREAM_START_CORKED),
- PA_TAG_INVALID);
+ pa_tagstruct_put(
+ t,
+ PA_TAG_U32, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM,
+ PA_TAG_U32, tag = s->context->ctag++,
+ PA_TAG_STRING, s->name,
+ PA_TAG_SAMPLE_SPEC, &s->sample_spec,
+ PA_TAG_CHANNEL_MAP, &s->channel_map,
+ PA_TAG_U32, PA_INVALID_INDEX,
+ PA_TAG_STRING, dev,
+ PA_TAG_U32, s->buffer_attr.maxlength,
+ PA_TAG_BOOLEAN, !!(flags & PA_STREAM_START_CORKED),
+ PA_TAG_INVALID);
if (s->direction == PA_STREAM_PLAYBACK) {
pa_cvolume cv;
- pa_tagstruct_put(t,
- PA_TAG_U32, s->buffer_attr.tlength,
- PA_TAG_U32, s->buffer_attr.prebuf,
- PA_TAG_U32, s->buffer_attr.minreq,
- PA_TAG_INVALID);
+ pa_tagstruct_put(
+ t,
+ PA_TAG_U32, s->buffer_attr.tlength,
+ PA_TAG_U32, s->buffer_attr.prebuf,
+ PA_TAG_U32, s->buffer_attr.minreq,
+ PA_TAG_U32, s->syncid,
+ PA_TAG_INVALID);
if (!volume) {
pa_cvolume_reset(&cv, s->sample_spec.channels);
@@ -369,23 +443,57 @@ static void create_stream(pa_stream *s, const char *dev, const pa_buffer_attr *a
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s);
pa_stream_unref(s);
+ return 0;
}
-void pa_stream_connect_playback(pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags, pa_cvolume *volume) {
- assert(s && s->context->state == PA_CONTEXT_READY && s->ref >= 1);
- s->direction = PA_STREAM_PLAYBACK;
- create_stream(s, dev, attr, flags, volume);
+int pa_stream_connect_playback(
+ pa_stream *s,
+ const char *dev,
+ const pa_buffer_attr *attr,
+ pa_stream_flags_t flags,
+ pa_cvolume *volume,
+ pa_stream *sync_stream) {
+
+ assert(s);
+ assert(s->ref >= 1);
+
+ return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream);
}
-void pa_stream_connect_record(pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags) {
- assert(s && s->context->state == PA_CONTEXT_READY && s->ref >= 1);
- s->direction = PA_STREAM_RECORD;
- create_stream(s, dev, attr, flags, 0);
+int pa_stream_connect_record(
+ pa_stream *s,
+ const char *dev,
+ const pa_buffer_attr *attr,
+ pa_stream_flags_t flags) {
+
+ assert(s);
+ assert(s->ref >= 1);
+
+ return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL);
}
-void pa_stream_write(pa_stream *s, const void *data, size_t length, void (*free_cb)(void *p), size_t delta) {
+int pa_stream_write(
+ pa_stream *s,
+ const void *data,
+ size_t length,
+ void (*free_cb)(void *p),
+ int64_t offset,
+ pa_seek_mode_t seek) {
+
pa_memchunk chunk;
- assert(s && s->context && data && length && s->state == PA_STREAM_READY && s->ref >= 1);
+
+ assert(s);
+ assert(s->ref >= 1);
+ assert(s->context);
+ assert(data);
+
+ PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID);
+ PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID);
+
+ if (length <= 0)
+ return 0;
if (free_cb) {
chunk.memblock = pa_memblock_new_user((void*) data, length, free_cb, 1, s->context->memblock_stat);
@@ -398,7 +506,7 @@ void pa_stream_write(pa_stream *s, const void *data, size_t length, void (*free_
chunk.index = 0;
chunk.length = length;
- pa_pstream_send_memblock(s->context->pstream, s->channel, delta, &chunk);
+ pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk);
pa_memblock_unref(chunk.memblock);
if (length < s->requested_bytes)
@@ -407,72 +515,87 @@ void pa_stream_write(pa_stream *s, const void *data, size_t length, void (*free_
s->requested_bytes = 0;
s->counter += length;
+ return 0;
}
-void pa_stream_peek(pa_stream *s, void **data, size_t *length) {
- assert(s && s->record_memblockq && data && length && s->state == PA_STREAM_READY && s->ref >= 1);
+int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
+ assert(s);
+ assert(s->ref >= 1);
+ assert(data);
+ assert(length);
+ PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
+
if (!s->peek_memchunk.memblock) {
- *data = NULL;
- *length = 0;
-
- if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0)
- return;
- pa_memblockq_drop(s->record_memblockq, &s->peek_memchunk, s->peek_memchunk.length);
+ if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) {
+ *data = NULL;
+ *length = 0;
+ return 0;
+ }
}
- *data = (char*)s->peek_memchunk.memblock->data + s->peek_memchunk.index;
+ *data = (const char*) s->peek_memchunk.memblock->data + s->peek_memchunk.index;
*length = s->peek_memchunk.length;
+ return 0;
}
-void pa_stream_drop(pa_stream *s) {
- assert(s && s->peek_memchunk.memblock && s->state == PA_STREAM_READY && s->ref >= 1);
-
- s->counter += s->peek_memchunk.length;
+int pa_stream_drop(pa_stream *s) {
+ assert(s);
+ assert(s->ref >= 1);
+ PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE);
+
+ pa_memblockq_drop(s->record_memblockq, &s->peek_memchunk, s->peek_memchunk.length);
+
pa_memblock_unref(s->peek_memchunk.memblock);
-
s->peek_memchunk.length = 0;
+ s->peek_memchunk.index = 0;
s->peek_memchunk.memblock = NULL;
+
+ s->counter += s->peek_memchunk.length;
+ return 0;
}
size_t pa_stream_writable_size(pa_stream *s) {
- assert(s && s->ref >= 1);
- return s->state == PA_STREAM_READY ? s->requested_bytes : 0;
+ assert(s);
+ assert(s->ref >= 1);
+
+ PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
+ PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE, (size_t) -1);
+
+ return s->requested_bytes;
}
size_t pa_stream_readable_size(pa_stream *s) {
- size_t sz;
-
- assert(s && s->ref >= 1);
-
- if (s->state != PA_STREAM_READY)
- return 0;
-
- assert(s->record_memblockq);
-
- sz = (size_t)pa_memblockq_get_length(s->record_memblockq);
+ assert(s);
+ assert(s->ref >= 1);
- if (s->peek_memchunk.memblock)
- sz += s->peek_memchunk.length;
+ PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1);
+ PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1);
- return sz;
+ return pa_memblockq_get_length(s->record_memblockq);
}
pa_operation * pa_stream_drain(pa_stream *s, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata) {
pa_operation *o;
pa_tagstruct *t;
uint32_t tag;
- assert(s && s->ref >= 1 && s->state == PA_STREAM_READY);
+
+ assert(s);
+ assert(s->ref >= 1);
+
+ PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
o = pa_operation_new(s->context, s);
- assert(o);
o->callback = (pa_operation_callback) cb;
o->userdata = userdata;
t = pa_tagstruct_new(NULL, 0);
- assert(t);
pa_tagstruct_putu32(t, PA_COMMAND_DRAIN_PLAYBACK_STREAM);
pa_tagstruct_putu32(t, tag = s->context->ctag++);
pa_tagstruct_putu32(t, s->channel);
@@ -501,7 +624,7 @@ static void stream_get_latency_info_callback(pa_pdispatch *pd, uint32_t command,
pa_tagstruct_get_timeval(t, &remote) < 0 ||
pa_tagstruct_getu64(t, &i.counter) < 0 ||
!pa_tagstruct_eof(t)) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
} else {
pa_gettimeofday(&now);
@@ -549,15 +672,18 @@ pa_operation* pa_stream_get_latency_info(pa_stream *s, void (*cb)(pa_stream *p,
pa_operation *o;
pa_tagstruct *t;
struct timeval now;
- assert(s && s->direction != PA_STREAM_UPLOAD);
+
+ assert(s);
+ assert(s->ref >= 1);
+ PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
+
o = pa_operation_new(s->context, s);
- assert(o);
o->callback = (pa_operation_callback) cb;
o->userdata = userdata;
t = pa_tagstruct_new(NULL, 0);
- assert(t);
pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY);
pa_tagstruct_putu32(t, tag = s->context->ctag++);
pa_tagstruct_putu32(t, s->channel);
@@ -585,7 +711,7 @@ void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UN
pa_stream_set_state(s, PA_STREAM_FAILED);
goto finish;
} else if (!pa_tagstruct_eof(t)) {
- pa_context_fail(s->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(s->context, PA_ERR_PROTOCOL);
goto finish;
}
@@ -595,18 +721,19 @@ finish:
pa_stream_unref(s);
}
-void pa_stream_disconnect(pa_stream *s) {
+int pa_stream_disconnect(pa_stream *s) {
pa_tagstruct *t;
uint32_t tag;
- assert(s && s->ref >= 1);
- if (!s->channel_valid || !s->context->state == PA_CONTEXT_READY)
- return;
+ assert(s);
+ assert(s->ref >= 1);
+
+ PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE);
+ PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE);
pa_stream_ref(s);
t = pa_tagstruct_new(NULL, 0);
- assert(t);
pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM :
(s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM));
@@ -616,26 +743,49 @@ void pa_stream_disconnect(pa_stream *s) {
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s);
pa_stream_unref(s);
+ return 0;
}
-void pa_stream_set_read_callback(pa_stream *s, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata) {
- assert(s && s->ref >= 1);
+void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
+ assert(s);
+ assert(s->ref >= 1);
+
s->read_callback = cb;
s->read_userdata = userdata;
}
-void pa_stream_set_write_callback(pa_stream *s, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata) {
- assert(s && s->ref >= 1);
+void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) {
+ assert(s);
+ assert(s->ref >= 1);
+
s->write_callback = cb;
s->write_userdata = userdata;
}
-void pa_stream_set_state_callback(pa_stream *s, void (*cb)(pa_stream *s, void *userdata), void *userdata) {
- assert(s && s->ref >= 1);
+void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
+ assert(s);
+ assert(s->ref >= 1);
+
s->state_callback = cb;
s->state_userdata = userdata;
}
+void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
+ assert(s);
+ assert(s->ref >= 1);
+
+ s->overflow_callback = cb;
+ s->overflow_userdata = userdata;
+}
+
+void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) {
+ assert(s);
+ assert(s->ref >= 1);
+
+ s->underflow_callback = cb;
+ s->underflow_userdata = userdata;
+}
+
void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) {
pa_operation *o = userdata;
int success = 1;
@@ -647,7 +797,7 @@ void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UN
success = 0;
} else if (!pa_tagstruct_eof(t)) {
- pa_context_fail(o->context, PA_ERROR_PROTOCOL);
+ pa_context_fail(o->context, PA_ERR_PROTOCOL);
goto finish;
}