diff options
author | Lennart Poettering <lennart@poettering.net> | 2006-02-20 04:05:16 +0000 |
---|---|---|
committer | Lennart Poettering <lennart@poettering.net> | 2006-02-20 04:05:16 +0000 |
commit | 304449002cbc84fdcf235b5dfaec891278dd7085 (patch) | |
tree | 2a2d00e34d5c620835b76a0d6f7890a1d3e9fb97 /src/polyp/stream.c | |
parent | 0876b1ba82ea9c988df90ca98d202765ac697313 (diff) |
1) Add flexible seeking support (including absolute) for memory block queues and playback streams
2) Add support to synchronize multiple playback streams
3) add two tests for 1) and 2)
4) s/PA_ERROR/PA_ERR/
5) s/PA_ERROR_OK/PA_OK/
6) update simple API to deal properly with new peek/drop recording API
7) add beginnings of proper validity checking on API calls in client libs (needs to be extended)
8) report playback buffer overflows/underflows to the client
9) move client side recording mcalign stuff into the memblockq
10) create typedefs for a bunch of API callback prototypes
11) simplify handling of HUP poll() events
Yes, i know, it's usually better to commit a lot of small patches instead of a
single big one. In this case however, this would have contradicted the other
rule: never commit broken or incomplete stuff.
*** This stuff needs a lot of additional testing! ***
git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@511 fefdeb5f-60dc-0310-8127-8f9354f1896f
Diffstat (limited to 'src/polyp/stream.c')
-rw-r--r-- | src/polyp/stream.c | 366 |
1 files changed, 258 insertions, 108 deletions
diff --git a/src/polyp/stream.c b/src/polyp/stream.c index 35de2d01..5497f0c4 100644 --- a/src/polyp/stream.c +++ b/src/polyp/stream.c @@ -28,6 +28,7 @@ #include <stdio.h> #include <string.h> +#include <polyp/def.h> #include <polypcore/xmalloc.h> #include <polypcore/pstream-util.h> #include <polypcore/util.h> @@ -39,14 +40,11 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *ss, const pa_channel_map *map) { pa_stream *s; + assert(c); - assert(ss); - - if (!pa_sample_spec_valid(ss)) - return NULL; - if (map && !pa_channel_map_valid(map)) - return NULL; + PA_CHECK_VALIDITY_RETURN_NULL(c, ss && pa_sample_spec_valid(ss), PA_ERR_INVALID); + PA_CHECK_VALIDITY_RETURN_NULL(c, !map || pa_channel_map_valid(map), PA_ERR_INVALID); s = pa_xnew(pa_stream, 1); s->ref = 1; @@ -59,6 +57,10 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec * s->write_userdata = NULL; s->state_callback = NULL; s->state_userdata = NULL; + s->overflow_callback = NULL; + s->overflow_userdata = NULL; + s->underflow_callback = NULL; + s->underflow_userdata = NULL; s->direction = PA_STREAM_NODIRECTION; s->name = pa_xstrdup(name); @@ -71,13 +73,13 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec * s->channel = 0; s->channel_valid = 0; + s->syncid = c->csyncid++; s->device_index = PA_INVALID_INDEX; s->requested_bytes = 0; - s->state = PA_STREAM_DISCONNECTED; + s->state = PA_STREAM_UNCONNECTED; memset(&s->buffer_attr, 0, sizeof(s->buffer_attr)); - s->mcalign = pa_mcalign_new(pa_frame_size(ss), c->memblock_stat); - + s->peek_memchunk.index = 0; s->peek_memchunk.length = 0; s->peek_memchunk.memblock = NULL; @@ -114,42 +116,52 @@ static void stream_free(pa_stream *s) { if (s->record_memblockq) pa_memblockq_free(s->record_memblockq); - pa_mcalign_free(s->mcalign); - pa_xfree(s->name); pa_xfree(s); } void pa_stream_unref(pa_stream *s) { - assert(s && s->ref >= 1); + assert(s); + assert(s->ref >= 1); if (--(s->ref) == 0) stream_free(s); } pa_stream* pa_stream_ref(pa_stream *s) { - assert(s && s->ref >= 1); + assert(s); + assert(s->ref >= 1); + s->ref++; return s; } pa_stream_state_t pa_stream_get_state(pa_stream *s) { - assert(s && s->ref >= 1); + assert(s); + assert(s->ref >= 1); + return s->state; } pa_context* pa_stream_get_context(pa_stream *s) { - assert(s && s->ref >= 1); + assert(s); + assert(s->ref >= 1); + return s->context; } uint32_t pa_stream_get_index(pa_stream *s) { - assert(s && s->ref >= 1); + assert(s); + assert(s->ref >= 1); + + PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX); + return s->device_index; } void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) { - assert(s && s->ref >= 1); + assert(s); + assert(s->ref >= 1); if (s->state == st) return; @@ -159,6 +171,8 @@ void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) { s->state = st; if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED) && s->context) { + /* Detach from context */ + if (s->channel_valid) pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL); @@ -182,14 +196,14 @@ void pa_command_stream_killed(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED if (pa_tagstruct_getu32(t, &channel) < 0 || !pa_tagstruct_eof(t)) { - pa_context_fail(c, PA_ERROR_PROTOCOL); + pa_context_fail(c, PA_ERR_PROTOCOL); goto finish; } if (!(s = pa_dynarray_get(command == PA_COMMAND_PLAYBACK_STREAM_KILLED ? c->playback_streams : c->record_streams, channel))) goto finish; - c->error = PA_ERROR_KILLED; + c->error = PA_ERR_KILLED; pa_stream_set_state(s, PA_STREAM_FAILED); finish: @@ -207,24 +221,55 @@ void pa_command_request(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32 if (pa_tagstruct_getu32(t, &channel) < 0 || pa_tagstruct_getu32(t, &bytes) < 0 || !pa_tagstruct_eof(t)) { - pa_context_fail(c, PA_ERROR_PROTOCOL); + pa_context_fail(c, PA_ERR_PROTOCOL); goto finish; } if (!(s = pa_dynarray_get(c->playback_streams, channel))) goto finish; - if (s->state != PA_STREAM_READY) - goto finish; + if (s->state == PA_STREAM_READY) { + s->requested_bytes += bytes; + + if (s->requested_bytes > 0 && s->write_callback) + s->write_callback(s, s->requested_bytes, s->write_userdata); + } - pa_stream_ref(s); +finish: + pa_context_unref(c); +} + +void pa_command_overflow_or_underflow(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) { + pa_stream *s; + pa_context *c = userdata; + uint32_t channel; + + assert(pd); + assert(command == PA_COMMAND_OVERFLOW || command == PA_COMMAND_UNDERFLOW); + assert(t); + assert(c); + + pa_context_ref(c); + + if (pa_tagstruct_getu32(t, &channel) < 0 || + !pa_tagstruct_eof(t)) { + pa_context_fail(c, PA_ERR_PROTOCOL); + goto finish; + } - s->requested_bytes += bytes; + if (!(s = pa_dynarray_get(c->playback_streams, channel))) + goto finish; - if (s->requested_bytes && s->write_callback) - s->write_callback(s, s->requested_bytes, s->write_userdata); + if (s->state == PA_STREAM_READY) { - pa_stream_unref(s); + if (command == PA_COMMAND_OVERFLOW) { + if (s->overflow_callback) + s->overflow_callback(s, s->overflow_userdata); + } else if (command == PA_COMMAND_UNDERFLOW) { + if (s->underflow_callback) + s->underflow_callback(s, s->underflow_userdata); + } + } finish: pa_context_unref(c); @@ -270,14 +315,21 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED ((s->direction != PA_STREAM_UPLOAD) && pa_tagstruct_getu32(t, &s->device_index) < 0) || ((s->direction != PA_STREAM_RECORD) && pa_tagstruct_getu32(t, &s->requested_bytes) < 0) || !pa_tagstruct_eof(t)) { - pa_context_fail(s->context, PA_ERROR_PROTOCOL); + pa_context_fail(s->context, PA_ERR_PROTOCOL); goto finish; } if (s->direction == PA_STREAM_RECORD) { assert(!s->record_memblockq); - s->record_memblockq = pa_memblockq_new(s->buffer_attr.maxlength, 0, - pa_frame_size(&s->sample_spec), 0, 0, s->context->memblock_stat); + s->record_memblockq = pa_memblockq_new( + 0, + s->buffer_attr.maxlength, + 0, + pa_frame_size(&s->sample_spec), + 1, + 0, + NULL, + s->context->memblock_stat); assert(s->record_memblockq); } @@ -303,13 +355,32 @@ finish: pa_stream_unref(s); } -static void create_stream(pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags, const pa_cvolume *volume) { +static int create_stream( + pa_stream_direction_t direction, + pa_stream *s, + const char *dev, + const pa_buffer_attr *attr, + pa_stream_flags_t flags, + const pa_cvolume *volume, + pa_stream *sync_stream) { + pa_tagstruct *t; uint32_t tag; - assert(s && s->ref >= 1 && s->state == PA_STREAM_DISCONNECTED); + + assert(s); + assert(s->ref >= 1); + + PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_UNCONNECTED, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, (flags & ~(PA_STREAM_START_CORKED|PA_STREAM_INTERPOLATE_LATENCY)) == 0, PA_ERR_INVALID); + PA_CHECK_VALIDITY(s->context, direction == PA_STREAM_PLAYBACK || flags == 0, PA_ERR_INVALID); pa_stream_ref(s); + s->direction = direction; + + if (sync_stream) + s->syncid = sync_stream->syncid; + s->interpolate = !!(flags & PA_STREAM_INTERPOLATE_LATENCY); pa_stream_trash_ipol(s); @@ -336,25 +407,28 @@ static void create_stream(pa_stream *s, const char *dev, const pa_buffer_attr *a dev = s->context->conf->default_source; } - pa_tagstruct_put(t, - PA_TAG_U32, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM, - PA_TAG_U32, tag = s->context->ctag++, - PA_TAG_STRING, s->name, - PA_TAG_SAMPLE_SPEC, &s->sample_spec, - PA_TAG_CHANNEL_MAP, &s->channel_map, - PA_TAG_U32, PA_INVALID_INDEX, - PA_TAG_STRING, dev, - PA_TAG_U32, s->buffer_attr.maxlength, - PA_TAG_BOOLEAN, !!(flags & PA_STREAM_START_CORKED), - PA_TAG_INVALID); + pa_tagstruct_put( + t, + PA_TAG_U32, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM, + PA_TAG_U32, tag = s->context->ctag++, + PA_TAG_STRING, s->name, + PA_TAG_SAMPLE_SPEC, &s->sample_spec, + PA_TAG_CHANNEL_MAP, &s->channel_map, + PA_TAG_U32, PA_INVALID_INDEX, + PA_TAG_STRING, dev, + PA_TAG_U32, s->buffer_attr.maxlength, + PA_TAG_BOOLEAN, !!(flags & PA_STREAM_START_CORKED), + PA_TAG_INVALID); if (s->direction == PA_STREAM_PLAYBACK) { pa_cvolume cv; - pa_tagstruct_put(t, - PA_TAG_U32, s->buffer_attr.tlength, - PA_TAG_U32, s->buffer_attr.prebuf, - PA_TAG_U32, s->buffer_attr.minreq, - PA_TAG_INVALID); + pa_tagstruct_put( + t, + PA_TAG_U32, s->buffer_attr.tlength, + PA_TAG_U32, s->buffer_attr.prebuf, + PA_TAG_U32, s->buffer_attr.minreq, + PA_TAG_U32, s->syncid, + PA_TAG_INVALID); if (!volume) { pa_cvolume_reset(&cv, s->sample_spec.channels); @@ -369,23 +443,57 @@ static void create_stream(pa_stream *s, const char *dev, const pa_buffer_attr *a pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s); pa_stream_unref(s); + return 0; } -void pa_stream_connect_playback(pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags, pa_cvolume *volume) { - assert(s && s->context->state == PA_CONTEXT_READY && s->ref >= 1); - s->direction = PA_STREAM_PLAYBACK; - create_stream(s, dev, attr, flags, volume); +int pa_stream_connect_playback( + pa_stream *s, + const char *dev, + const pa_buffer_attr *attr, + pa_stream_flags_t flags, + pa_cvolume *volume, + pa_stream *sync_stream) { + + assert(s); + assert(s->ref >= 1); + + return create_stream(PA_STREAM_PLAYBACK, s, dev, attr, flags, volume, sync_stream); } -void pa_stream_connect_record(pa_stream *s, const char *dev, const pa_buffer_attr *attr, pa_stream_flags_t flags) { - assert(s && s->context->state == PA_CONTEXT_READY && s->ref >= 1); - s->direction = PA_STREAM_RECORD; - create_stream(s, dev, attr, flags, 0); +int pa_stream_connect_record( + pa_stream *s, + const char *dev, + const pa_buffer_attr *attr, + pa_stream_flags_t flags) { + + assert(s); + assert(s->ref >= 1); + + return create_stream(PA_STREAM_RECORD, s, dev, attr, flags, NULL, NULL); } -void pa_stream_write(pa_stream *s, const void *data, size_t length, void (*free_cb)(void *p), size_t delta) { +int pa_stream_write( + pa_stream *s, + const void *data, + size_t length, + void (*free_cb)(void *p), + int64_t offset, + pa_seek_mode_t seek) { + pa_memchunk chunk; - assert(s && s->context && data && length && s->state == PA_STREAM_READY && s->ref >= 1); + + assert(s); + assert(s->ref >= 1); + assert(s->context); + assert(data); + + PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID); + PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID); + + if (length <= 0) + return 0; if (free_cb) { chunk.memblock = pa_memblock_new_user((void*) data, length, free_cb, 1, s->context->memblock_stat); @@ -398,7 +506,7 @@ void pa_stream_write(pa_stream *s, const void *data, size_t length, void (*free_ chunk.index = 0; chunk.length = length; - pa_pstream_send_memblock(s->context->pstream, s->channel, delta, &chunk); + pa_pstream_send_memblock(s->context->pstream, s->channel, offset, seek, &chunk); pa_memblock_unref(chunk.memblock); if (length < s->requested_bytes) @@ -407,72 +515,87 @@ void pa_stream_write(pa_stream *s, const void *data, size_t length, void (*free_ s->requested_bytes = 0; s->counter += length; + return 0; } -void pa_stream_peek(pa_stream *s, void **data, size_t *length) { - assert(s && s->record_memblockq && data && length && s->state == PA_STREAM_READY && s->ref >= 1); +int pa_stream_peek(pa_stream *s, const void **data, size_t *length) { + assert(s); + assert(s->ref >= 1); + assert(data); + assert(length); + PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE); + if (!s->peek_memchunk.memblock) { - *data = NULL; - *length = 0; - - if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) - return; - pa_memblockq_drop(s->record_memblockq, &s->peek_memchunk, s->peek_memchunk.length); + if (pa_memblockq_peek(s->record_memblockq, &s->peek_memchunk) < 0) { + *data = NULL; + *length = 0; + return 0; + } } - *data = (char*)s->peek_memchunk.memblock->data + s->peek_memchunk.index; + *data = (const char*) s->peek_memchunk.memblock->data + s->peek_memchunk.index; *length = s->peek_memchunk.length; + return 0; } -void pa_stream_drop(pa_stream *s) { - assert(s && s->peek_memchunk.memblock && s->state == PA_STREAM_READY && s->ref >= 1); - - s->counter += s->peek_memchunk.length; +int pa_stream_drop(pa_stream *s) { + assert(s); + assert(s->ref >= 1); + PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE); + + pa_memblockq_drop(s->record_memblockq, &s->peek_memchunk, s->peek_memchunk.length); + pa_memblock_unref(s->peek_memchunk.memblock); - s->peek_memchunk.length = 0; + s->peek_memchunk.index = 0; s->peek_memchunk.memblock = NULL; + + s->counter += s->peek_memchunk.length; + return 0; } size_t pa_stream_writable_size(pa_stream *s) { - assert(s && s->ref >= 1); - return s->state == PA_STREAM_READY ? s->requested_bytes : 0; + assert(s); + assert(s->ref >= 1); + + PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1); + PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE, (size_t) -1); + + return s->requested_bytes; } size_t pa_stream_readable_size(pa_stream *s) { - size_t sz; - - assert(s && s->ref >= 1); - - if (s->state != PA_STREAM_READY) - return 0; - - assert(s->record_memblockq); - - sz = (size_t)pa_memblockq_get_length(s->record_memblockq); + assert(s); + assert(s->ref >= 1); - if (s->peek_memchunk.memblock) - sz += s->peek_memchunk.length; + PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE, (size_t) -1); + PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1); - return sz; + return pa_memblockq_get_length(s->record_memblockq); } pa_operation * pa_stream_drain(pa_stream *s, void (*cb) (pa_stream*s, int success, void *userdata), void *userdata) { pa_operation *o; pa_tagstruct *t; uint32_t tag; - assert(s && s->ref >= 1 && s->state == PA_STREAM_READY); + + assert(s); + assert(s->ref >= 1); + + PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE); o = pa_operation_new(s->context, s); - assert(o); o->callback = (pa_operation_callback) cb; o->userdata = userdata; t = pa_tagstruct_new(NULL, 0); - assert(t); pa_tagstruct_putu32(t, PA_COMMAND_DRAIN_PLAYBACK_STREAM); pa_tagstruct_putu32(t, tag = s->context->ctag++); pa_tagstruct_putu32(t, s->channel); @@ -501,7 +624,7 @@ static void stream_get_latency_info_callback(pa_pdispatch *pd, uint32_t command, pa_tagstruct_get_timeval(t, &remote) < 0 || pa_tagstruct_getu64(t, &i.counter) < 0 || !pa_tagstruct_eof(t)) { - pa_context_fail(o->context, PA_ERROR_PROTOCOL); + pa_context_fail(o->context, PA_ERR_PROTOCOL); goto finish; } else { pa_gettimeofday(&now); @@ -549,15 +672,18 @@ pa_operation* pa_stream_get_latency_info(pa_stream *s, void (*cb)(pa_stream *p, pa_operation *o; pa_tagstruct *t; struct timeval now; - assert(s && s->direction != PA_STREAM_UPLOAD); + + assert(s); + assert(s->ref >= 1); + PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); + o = pa_operation_new(s->context, s); - assert(o); o->callback = (pa_operation_callback) cb; o->userdata = userdata; t = pa_tagstruct_new(NULL, 0); - assert(t); pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_GET_PLAYBACK_LATENCY : PA_COMMAND_GET_RECORD_LATENCY); pa_tagstruct_putu32(t, tag = s->context->ctag++); pa_tagstruct_putu32(t, s->channel); @@ -585,7 +711,7 @@ void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UN pa_stream_set_state(s, PA_STREAM_FAILED); goto finish; } else if (!pa_tagstruct_eof(t)) { - pa_context_fail(s->context, PA_ERROR_PROTOCOL); + pa_context_fail(s->context, PA_ERR_PROTOCOL); goto finish; } @@ -595,18 +721,19 @@ finish: pa_stream_unref(s); } -void pa_stream_disconnect(pa_stream *s) { +int pa_stream_disconnect(pa_stream *s) { pa_tagstruct *t; uint32_t tag; - assert(s && s->ref >= 1); - if (!s->channel_valid || !s->context->state == PA_CONTEXT_READY) - return; + assert(s); + assert(s->ref >= 1); + + PA_CHECK_VALIDITY(s->context, s->channel_valid, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE); pa_stream_ref(s); t = pa_tagstruct_new(NULL, 0); - assert(t); pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_DELETE_PLAYBACK_STREAM : (s->direction == PA_STREAM_RECORD ? PA_COMMAND_DELETE_RECORD_STREAM : PA_COMMAND_DELETE_UPLOAD_STREAM)); @@ -616,26 +743,49 @@ void pa_stream_disconnect(pa_stream *s) { pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s); pa_stream_unref(s); + return 0; } -void pa_stream_set_read_callback(pa_stream *s, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata) { - assert(s && s->ref >= 1); +void pa_stream_set_read_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) { + assert(s); + assert(s->ref >= 1); + s->read_callback = cb; s->read_userdata = userdata; } -void pa_stream_set_write_callback(pa_stream *s, void (*cb)(pa_stream *p, size_t length, void *userdata), void *userdata) { - assert(s && s->ref >= 1); +void pa_stream_set_write_callback(pa_stream *s, pa_stream_request_cb_t cb, void *userdata) { + assert(s); + assert(s->ref >= 1); + s->write_callback = cb; s->write_userdata = userdata; } -void pa_stream_set_state_callback(pa_stream *s, void (*cb)(pa_stream *s, void *userdata), void *userdata) { - assert(s && s->ref >= 1); +void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) { + assert(s); + assert(s->ref >= 1); + s->state_callback = cb; s->state_userdata = userdata; } +void pa_stream_set_overflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) { + assert(s); + assert(s->ref >= 1); + + s->overflow_callback = cb; + s->overflow_userdata = userdata; +} + +void pa_stream_set_underflow_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) { + assert(s); + assert(s->ref >= 1); + + s->underflow_callback = cb; + s->underflow_userdata = userdata; +} + void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) { pa_operation *o = userdata; int success = 1; @@ -647,7 +797,7 @@ void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UN success = 0; } else if (!pa_tagstruct_eof(t)) { - pa_context_fail(o->context, PA_ERROR_PROTOCOL); + pa_context_fail(o->context, PA_ERR_PROTOCOL); goto finish; } |