From 863fb90d90c2e57e60a0f5b81e0847319399b8ed Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Wed, 7 Jul 2004 22:02:15 +0000 Subject: add output stream draining git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@53 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/memblockq.c | 2 +- src/pacat-simple.c | 6 +++++ src/pacat.c | 17 ++++++++++--- src/pdispatch.c | 43 ++++++++++++++++++++------------ src/pdispatch.h | 3 +++ src/polyp.c | 61 +++++++++++++++++++++++++++++++++++++++++++++- src/polyp.h | 6 +++++ src/protocol-native-spec.h | 1 + src/protocol-native.c | 52 ++++++++++++++++++++++++++++++++++++++- src/simple.c | 34 ++++++++++++++++++++++---- src/simple.h | 2 ++ src/todo | 1 - 12 files changed, 200 insertions(+), 28 deletions(-) diff --git a/src/memblockq.c b/src/memblockq.c index e5dab687..fb4cbc7e 100644 --- a/src/memblockq.c +++ b/src/memblockq.c @@ -228,7 +228,7 @@ void pa_memblockq_empty(struct pa_memblockq *bq) { int pa_memblockq_is_readable(struct pa_memblockq *bq) { assert(bq); - return bq->current_length >= bq->prebuf; + return bq->current_length && (bq->current_length >= bq->prebuf); } int pa_memblockq_is_writable(struct pa_memblockq *bq, size_t length) { diff --git a/src/pacat-simple.c b/src/pacat-simple.c index 8b48bdd3..896df814 100644 --- a/src/pacat-simple.c +++ b/src/pacat-simple.c @@ -41,6 +41,12 @@ int main(int argc, char*argv[]) { } } + /* Make sure that every single sample way played */ + if (pa_simple_drain(s, &error) < 0) { + fprintf(stderr, __FILE__": pa_simple_drain() failed: %s\n", pa_strerror(error)); + goto finish; + } + ret = 0; finish: diff --git a/src/pacat.c b/src/pacat.c index 75a94fc0..59ccc462 100644 --- a/src/pacat.c +++ b/src/pacat.c @@ -117,6 +117,18 @@ static void context_drain_complete(struct pa_context*c, void *userdata) { quit(0); } +static void stream_drain_complete(struct pa_stream*s, void *userdata) { + fprintf(stderr, "Playback stream drained.\n"); + + pa_stream_free(stream); + stream = NULL; + + if (pa_context_drain(context, context_drain_complete, NULL) < 0) + quit(0); + else + fprintf(stderr, "Draining connection to server.\n"); +} + static void stdin_callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) { size_t l, w = 0; ssize_t r; @@ -135,10 +147,7 @@ static void stdin_callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_m if ((r = read(fd, buffer, l)) <= 0) { if (r == 0) { fprintf(stderr, "Got EOF.\n"); - if (pa_context_drain(context, context_drain_complete, NULL) < 0) - quit(0); - else - fprintf(stderr, "Draining connection to server.\n"); + pa_stream_drain(stream, stream_drain_complete, NULL); } else { fprintf(stderr, "read() failed: %s\n", strerror(errno)); quit(1); diff --git a/src/pdispatch.c b/src/pdispatch.c index ec454190..b7257dd4 100644 --- a/src/pdispatch.c +++ b/src/pdispatch.c @@ -18,6 +18,7 @@ static const char *command_names[PA_COMMAND_MAX] = { [PA_COMMAND_SET_NAME] = "SET_NAME", [PA_COMMAND_LOOKUP_SINK] = "LOOKUP_SINK", [PA_COMMAND_LOOKUP_SOURCE] = "LOOKUP_SOURCE", + [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = "DRAIN_PLAYBACK_STREAM", }; struct reply_info { @@ -27,6 +28,7 @@ struct reply_info { void *userdata; uint32_t tag; void *mainloop_timeout; + int callback_is_running; }; struct pa_pdispatch { @@ -77,7 +79,7 @@ struct pa_pdispatch* pa_pdispatch_new(struct pa_mainloop_api *mainloop, const st void pa_pdispatch_free(struct pa_pdispatch *pd) { assert(pd); - + if (pd->in_use) { pd->shall_free = 1; return; @@ -109,25 +111,23 @@ int pa_pdispatch_run(struct pa_pdispatch *pd, struct pa_packet*packet, void *use if (command == PA_COMMAND_ERROR || command == PA_COMMAND_REPLY) { struct reply_info *r; - for (r = pd->replies; r; r = r->next) { - if (r->tag != tag) - continue; - - pd->in_use = 1; + for (r = pd->replies; r; r = r->next) + if (r->tag == tag) + break; + + if (r) { + pd->in_use = r->callback_is_running = 1; assert(r->callback); r->callback(r->pdispatch, command, tag, ts, r->userdata); - pd->in_use = 0; + pd->in_use = r->callback_is_running = 0; reply_info_free(r); - if (pd->shall_free) { + if (pd->shall_free) pa_pdispatch_free(pd); - break; + else { + if (pd->drain_callback && !pa_pdispatch_is_pending(pd)) + pd->drain_callback(pd, pd->drain_userdata); } - - if (pd->drain_callback && !pa_pdispatch_is_pending(r->pdispatch)) - pd->drain_callback(r->pdispatch, r->pdispatch->drain_userdata); - - break; } } else if (pd->command_table && command < pd->n_commands) { @@ -169,7 +169,8 @@ void pa_pdispatch_register_reply(struct pa_pdispatch *pd, uint32_t tag, int time r->callback = cb; r->userdata = userdata; r->tag = tag; - + r->callback_is_running = 0; + gettimeofday(&tv, NULL); tv.tv_sec += timeout; @@ -196,3 +197,15 @@ void pa_pdispatch_set_drain_callback(struct pa_pdispatch *pd, void (*cb)(struct pd->drain_callback = cb; pd->drain_userdata = userdata; } + +void pa_pdispatch_unregister_reply(struct pa_pdispatch *pd, void *userdata) { + struct reply_info *r, *n; + assert(pd); + + for (r = pd->replies; r; r = n) { + n = r->next; + + if (!r->callback_is_running && r->userdata == userdata) /* when this item's callback is currently running it is destroyed anyway in the very near future */ + reply_info_free(r); + } +} diff --git a/src/pdispatch.h b/src/pdispatch.h index 35e93829..ac372477 100644 --- a/src/pdispatch.h +++ b/src/pdispatch.h @@ -25,4 +25,7 @@ int pa_pdispatch_is_pending(struct pa_pdispatch *pd); void pa_pdispatch_set_drain_callback(struct pa_pdispatch *pd, void (*cb)(struct pa_pdispatch *pd, void *userdata), void *userdata); +/* Remove all reply slots with the give userdata parameter */ +void pa_pdispatch_unregister_reply(struct pa_pdispatch *pd, void *userdata); + #endif diff --git a/src/polyp.c b/src/polyp.c index 9af8d468..0f2a9181 100644 --- a/src/polyp.c +++ b/src/polyp.c @@ -75,6 +75,9 @@ struct pa_stream { void (*create_complete_callback)(struct pa_stream *s, int success, void *userdata); void *create_complete_userdata; + + void (*drain_complete_callback)(struct pa_stream *s, void *userdata); + void *drain_complete_userdata; void (*die_callback)(struct pa_stream*c, void *userdata); void *die_userdata; @@ -538,8 +541,10 @@ struct pa_stream* pa_stream_new( void pa_stream_free(struct pa_stream *s) { assert(s && s->context); - free(s->name); + pa_pdispatch_unregister_reply(s->context->pdispatch, s); + free(s->name); + if (s->channel_valid && s->context->state == CONTEXT_READY) { struct pa_tagstruct *t = pa_tagstruct_new(NULL, 0); assert(t); @@ -683,3 +688,57 @@ int pa_context_drain( return 0; } + +static void stream_drain_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { + struct pa_stream *s = userdata; + assert(pd && s); + + if (command != PA_COMMAND_REPLY) { + if (handle_error(s->context, command, t) < 0) { + context_dead(s->context); + return; + } + + stream_dead(s); + return; + } + + if (s->state != STREAM_READY) + return; + + if (!pa_tagstruct_eof(t)) { + s->context->error = PA_ERROR_PROTOCOL; + context_dead(s->context); + return; + } + + if (s->drain_complete_callback) { + void (*temp) (struct pa_stream*s, void *userdata) = s->drain_complete_callback; + s->drain_complete_callback = NULL; + temp(s, s->drain_complete_userdata); + } +} + + +void pa_stream_drain(struct pa_stream *s, void (*complete) (struct pa_stream*s, void *userdata), void *userdata) { + struct pa_tagstruct *t; + uint32_t tag; + assert(s && s->state == STREAM_READY); + + if (!complete) { + s->drain_complete_callback = NULL; + return; + } + + s->drain_complete_callback = complete; + s->drain_complete_userdata = userdata; + + t = pa_tagstruct_new(NULL, 0); + assert(t); + + pa_tagstruct_putu32(t, PA_COMMAND_DRAIN_PLAYBACK_STREAM); + pa_tagstruct_putu32(t, tag = s->context->ctag++); + pa_tagstruct_putu32(t, s->channel); + pa_pstream_send_tagstruct(s->context->pstream, t); + pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, stream_drain_callback, s); +} diff --git a/src/polyp.h b/src/polyp.h index 25ee3bed..c49a72b2 100644 --- a/src/polyp.h +++ b/src/polyp.h @@ -46,6 +46,12 @@ struct pa_stream* pa_stream_new( void pa_stream_free(struct pa_stream *p); +void pa_stream_drain( + struct pa_stream *s, + void (*complete) (struct pa_stream*s, void *userdata), + void *userdata); + + void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream *s, void *userdata), void *userdata); void pa_stream_set_write_callback(struct pa_stream *p, void (*cb)(struct pa_stream *p, size_t length, void *userdata), void *userdata); diff --git a/src/protocol-native-spec.h b/src/protocol-native-spec.h index 7fb9ac4a..fea14cc0 100644 --- a/src/protocol-native-spec.h +++ b/src/protocol-native-spec.h @@ -15,6 +15,7 @@ enum { PA_COMMAND_SET_NAME, PA_COMMAND_LOOKUP_SINK, PA_COMMAND_LOOKUP_SOURCE, + PA_COMMAND_DRAIN_PLAYBACK_STREAM, PA_COMMAND_MAX }; diff --git a/src/protocol-native.c b/src/protocol-native.c index 42ff4b52..110d0d6b 100644 --- a/src/protocol-native.c +++ b/src/protocol-native.c @@ -32,6 +32,8 @@ struct playback_stream { struct pa_sink_input *sink_input; struct pa_memblockq *memblockq; size_t requested_bytes; + int drain_request; + uint32_t drain_tag; }; struct connection { @@ -61,6 +63,7 @@ static void request_bytes(struct playback_stream*s); static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); static void command_delete_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); +static void command_drain_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); static void command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); @@ -71,6 +74,7 @@ static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = { [PA_COMMAND_REPLY] = { NULL }, [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream }, [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_playback_stream }, + [PA_COMMAND_DRAIN_PLAYBACK_STREAM] = { command_drain_playback_stream }, [PA_COMMAND_CREATE_RECORD_STREAM] = { NULL }, [PA_COMMAND_DELETE_RECORD_STREAM] = { NULL }, [PA_COMMAND_AUTH] = { command_auth }, @@ -116,6 +120,7 @@ static struct playback_stream* playback_stream_new(struct connection *c, struct assert(s->memblockq); s->requested_bytes = 0; + s->drain_request = 0; pa_idxset_put(c->playback_streams, s, &s->index); return s; @@ -124,6 +129,9 @@ static struct playback_stream* playback_stream_new(struct connection *c, struct static void playback_stream_free(struct playback_stream* p) { assert(p && p->connection); + if (p->drain_request) + pa_pstream_send_error(p->connection->pstream, p->drain_tag, PA_ERROR_NOENTITY); + pa_idxset_remove_by_data(p->connection->playback_streams, p, NULL); pa_sink_input_free(p->sink_input); pa_memblockq_free(p->memblockq); @@ -199,6 +207,11 @@ static void sink_input_drop_cb(struct pa_sink_input *i, size_t length) { pa_memblockq_drop(s->memblockq, length); request_bytes(s); + + if (s->drain_request && !pa_memblockq_is_readable(s->memblockq)) { + pa_pstream_send_simple_ack(s->connection->pstream, s->drain_tag); + s->drain_request = 0; + } } static void sink_input_kill_cb(struct pa_sink_input *i) { @@ -258,7 +271,7 @@ static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t com sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index); if (!sink) { - pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST); + pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY); return; } @@ -373,6 +386,11 @@ static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t t return; } + if (!c->authorized) { + pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS); + return; + } + if (command == PA_COMMAND_LOOKUP_SINK) { struct pa_sink *sink; if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK))) @@ -397,6 +415,38 @@ static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t t } } +static void command_drain_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { + struct connection *c = userdata; + uint32_t index; + struct playback_stream *s; + assert(c && t); + + if (pa_tagstruct_getu32(t, &index) < 0 || + !pa_tagstruct_eof(t)) { + protocol_error(c); + return; + } + + if (!c->authorized) { + pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS); + return; + } + + if (!(s = pa_idxset_get_by_index(c->playback_streams, index))) { + pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY); + return; + } + + s->drain_request = 0; + + if (!pa_memblockq_is_readable(s->memblockq)) + pa_pstream_send_simple_ack(c->pstream, tag); + else { + s->drain_request = 1; + s->drain_tag = tag; + } +} + /*** pstream callbacks ***/ static void packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) { diff --git a/src/simple.c b/src/simple.c index cf31ac52..50bfea43 100644 --- a/src/simple.c +++ b/src/simple.c @@ -11,23 +11,25 @@ struct pa_simple { struct pa_context *context; struct pa_stream *stream; - int dead; + int dead, drained; }; static int iterate(struct pa_simple *p, int block, int *perror) { - assert(p && p->context && p->mainloop && perror); + assert(p && p->context && p->mainloop); if (!block && !pa_context_is_pending(p->context)) return 0; do { if (pa_context_is_dead(p->context) || (p->stream && pa_stream_is_dead(p->stream))) { - *perror = pa_context_errno(p->context); + if (perror) + *perror = pa_context_errno(p->context); return -1; } if (pa_mainloop_iterate(p->mainloop, 1, NULL) < 0) { - *perror = PA_ERROR_INTERNAL; + if (perror) + *perror = PA_ERROR_INTERNAL; return -1; } } while (pa_context_is_pending(p->context)); @@ -83,7 +85,8 @@ struct pa_simple* pa_simple_new( return p; fail: - *perror = error; + if (perror) + *perror = error; pa_simple_free(p); return NULL; } @@ -132,3 +135,24 @@ int pa_simple_read(struct pa_simple *s, void*data, size_t length, int *perror) { assert(0); } + +static void drain_complete(struct pa_stream *s, void *userdata) { + struct pa_simple *p = userdata; + assert(s && p); + p->drained = 1; +} + +int pa_simple_drain(struct pa_simple *p, int *perror) { + assert(p); + p->drained = 0; + pa_stream_drain(p->stream, drain_complete, p); + + while (!p->drained) { + if (iterate(p, 1, perror) < 0) { + pa_stream_drain(p->stream, NULL, NULL); + return -1; + } + } + + return 0; +} diff --git a/src/simple.h b/src/simple.h index ed181201..f5f872ee 100644 --- a/src/simple.h +++ b/src/simple.h @@ -21,6 +21,8 @@ struct pa_simple* pa_simple_new( void pa_simple_free(struct pa_simple *s); int pa_simple_write(struct pa_simple *s, const void*data, size_t length, int *error); +int pa_simple_drain(struct pa_simple *s, int *error); + int pa_simple_read(struct pa_simple *s, void*data, size_t length, int *error); #endif diff --git a/src/todo b/src/todo index 57040cd8..93ba2821 100644 --- a/src/todo +++ b/src/todo @@ -1,6 +1,5 @@ - recording (general, simple, esound, native) - native library/protocol: - sync() function more functions (esp. latency) - simple library -- cgit