diff options
Diffstat (limited to 'src/polyp/stream.c')
-rw-r--r-- | src/polyp/stream.c | 72 |
1 files changed, 44 insertions, 28 deletions
diff --git a/src/polyp/stream.c b/src/polyp/stream.c index f11ef493..c86a200a 100644 --- a/src/polyp/stream.c +++ b/src/polyp/stream.c @@ -106,21 +106,15 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec * s->auto_timing_update_event = NULL; s->auto_timing_update_requested = 0; + /* Refcounting is strictly one-way: from the "bigger" to the "smaller" object. */ PA_LLIST_PREPEND(pa_stream, c->streams, s); - - /* The context and stream will point at each other. We cannot ref count - both though since that will create a loop. */ - pa_context_ref(s->context); + pa_stream_ref(s); return s; } static void stream_free(pa_stream *s) { - assert(s && s->context && !s->channel_valid); - - PA_LLIST_REMOVE(pa_stream, s->context->streams, s); - - pa_context_unref(s->context); + assert(s && !s->context && !s->channel_valid); if (s->auto_timing_update_event) { assert(s->mainloop); @@ -186,20 +180,38 @@ void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) { pa_stream_ref(s); s->state = st; - + if (s->state_callback) + s->state_callback(s, s->state_userdata); + if ((st == PA_STREAM_FAILED || st == PA_STREAM_TERMINATED) && s->context) { + /* Detach from context */ + pa_operation *o, *n; + + /* Unref all operatio object that point to us */ + for (o = s->context->operations; o; o = n) { + n = o->next; + + if (o->stream == s) + pa_operation_cancel(o); + } + /* Drop all outstanding replies for this stream */ + if (s->context->pdispatch) + pa_pdispatch_unregister_reply(s->context->pdispatch, s); + if (s->channel_valid) pa_dynarray_put((s->direction == PA_STREAM_PLAYBACK) ? s->context->playback_streams : s->context->record_streams, s->channel, NULL); + + PA_LLIST_REMOVE(pa_stream, s->context->streams, s); + pa_stream_unref(s); s->channel = 0; s->channel_valid = 0; + + s->context = NULL; } - if (s->state_callback) - s->state_callback(s, s->state_userdata); - pa_stream_unref(s); } @@ -513,7 +525,7 @@ static int create_stream( pa_tagstruct_putu32(t, s->buffer_attr.fragsize); pa_pstream_send_tagstruct(s->context->pstream, t); - pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s); + pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_create_stream_callback, s, NULL); pa_stream_set_state(s, PA_STREAM_CREATING); @@ -704,9 +716,9 @@ pa_operation * pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *us t = pa_tagstruct_command(s->context, PA_COMMAND_DRAIN_PLAYBACK_STREAM, &tag); pa_tagstruct_putu32(t, s->channel); pa_pstream_send_tagstruct(s->context->pstream, t); - pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o); + pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref); - return pa_operation_ref(o); + return o; } static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { @@ -717,7 +729,9 @@ static void stream_get_timing_info_callback(pa_pdispatch *pd, uint32_t command, assert(pd); assert(o); assert(o->stream); - assert(o->context); + + if (!o->context) + goto finish; i = &o->stream->timing_info; @@ -869,7 +883,7 @@ pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t /* Check if we could allocate a correction slot. If not, there are too many outstanding queries */ PA_CHECK_VALIDITY_RETURN_NULL(s->context, !s->write_index_corrections[cidx].valid, PA_ERR_INTERNAL); } - o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata); + o = pa_operation_new(s->context, s, (pa_operation_cb_t) cb, userdata); t = pa_tagstruct_command( s->context, @@ -879,7 +893,7 @@ pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t pa_tagstruct_put_timeval(t, pa_gettimeofday(&now)); pa_pstream_send_tagstruct(s->context->pstream, t); - pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, o); + pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_get_timing_info_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref); if (s->direction == PA_STREAM_PLAYBACK) { /* Fill in initial correction data */ @@ -893,7 +907,7 @@ pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t /* pa_log("requesting update %u\n", tag); */ - return pa_operation_ref(o); + return o; } void pa_stream_disconnect_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED uint32_t tag, pa_tagstruct *t, void *userdata) { @@ -941,7 +955,7 @@ int pa_stream_disconnect(pa_stream *s) { &tag); pa_tagstruct_putu32(t, s->channel); pa_pstream_send_tagstruct(s->context->pstream, t); - pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s); + pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_disconnect_callback, s, NULL); pa_stream_unref(s); return 0; @@ -993,9 +1007,11 @@ void pa_stream_simple_ack_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UN assert(pd); assert(o); - assert(o->context); assert(o->ref >= 1); + if (!o->context) + goto finish; + if (command != PA_COMMAND_REPLY) { if (pa_context_handle_error(o->context, command, t) < 0) goto finish; @@ -1038,12 +1054,12 @@ pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, voi pa_tagstruct_putu32(t, s->channel); pa_tagstruct_put_boolean(t, !!b); pa_pstream_send_tagstruct(s->context->pstream, t); - pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o); + pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref); if (s->direction == PA_STREAM_PLAYBACK) invalidate_indexes(s, 1, 0); - return pa_operation_ref(o); + return o; } static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, pa_stream_success_cb_t cb, void *userdata) { @@ -1061,9 +1077,9 @@ static pa_operation* stream_send_simple_command(pa_stream *s, uint32_t command, t = pa_tagstruct_command(s->context, command, &tag); pa_tagstruct_putu32(t, s->channel); pa_pstream_send_tagstruct(s->context->pstream, t); - pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o); + pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref); - return pa_operation_ref(o); + return o; } pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) { @@ -1136,9 +1152,9 @@ pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_succe pa_tagstruct_putu32(t, s->channel); pa_tagstruct_puts(t, name); pa_pstream_send_tagstruct(s->context->pstream, t); - pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o); + pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, pa_operation_ref(o), (pa_free_cb_t) pa_operation_unref); - return pa_operation_ref(o); + return o; } int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) { |