From 41295bbf56ef6df0a0e705149475d91c8d83ff3f Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Sun, 22 Aug 2004 21:13:58 +0000 Subject: new features: future cancellation corking flushing for playback streams in native protocol git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@152 fefdeb5f-60dc-0310-8127-8f9354f1896f --- polyp/memblockq.c | 165 +++++++++++++++++++++++------------------------ polyp/memblockq.h | 16 +++-- polyp/native-common.h | 3 + polyp/pacat.c | 2 +- polyp/pactl.c | 2 +- polyp/play-memchunk.c | 3 +- polyp/polyplib-context.c | 2 +- polyp/polyplib-simple.c | 2 +- polyp/polyplib-stream.c | 49 +++++++++++++- polyp/polyplib-stream.h | 35 +++++++++- polyp/protocol-esound.c | 8 +-- polyp/protocol-native.c | 68 +++++++++++++++++-- polyp/protocol-simple.c | 6 +- polyp/pstream.c | 10 +-- polyp/pstream.h | 4 +- polyp/sink-input.c | 23 +++++-- polyp/sink-input.h | 8 ++- polyp/sink.c | 2 +- polyp/util.c | 21 ++++++ polyp/util.h | 3 + 20 files changed, 306 insertions(+), 126 deletions(-) diff --git a/polyp/memblockq.c b/polyp/memblockq.c index 085c0510..b6dcca3f 100644 --- a/polyp/memblockq.c +++ b/polyp/memblockq.c @@ -28,22 +28,20 @@ #include #include #include +#include #include "memblockq.h" #include "xmalloc.h" struct memblock_list { - struct memblock_list *next; + struct memblock_list *next, *prev; struct pa_memchunk chunk; - struct timeval stamp; }; struct pa_memblockq { struct memblock_list *blocks, *blocks_tail; unsigned n_blocks; size_t current_length, maxlength, tlength, base, prebuf, minreq; - int measure_delay; - uint32_t delay; struct pa_mcalign *mcalign; struct pa_memblock_stat *memblock_stat; }; @@ -66,7 +64,7 @@ struct pa_memblockq* pa_memblockq_new(size_t maxlength, size_t tlength, size_t b assert(bq->maxlength >= base); bq->tlength = ((tlength+base-1)/base)*base; - if (bq->tlength == 0 || bq->tlength >= bq->maxlength) + if (!bq->tlength || bq->tlength >= bq->maxlength) bq->tlength = bq->maxlength; bq->prebuf = (prebuf == (size_t) -1) ? bq->maxlength/2 : prebuf; @@ -80,29 +78,21 @@ struct pa_memblockq* pa_memblockq_new(size_t maxlength, size_t tlength, size_t b fprintf(stderr, "memblockq sanitized: maxlength=%u, tlength=%u, base=%u, prebuf=%u, minreq=%u\n", bq->maxlength, bq->tlength, bq->base, bq->prebuf, bq->minreq); - bq->measure_delay = 0; - bq->delay = 0; - bq->mcalign = NULL; bq->memblock_stat = s; - + return bq; } void pa_memblockq_free(struct pa_memblockq* bq) { - struct memblock_list *l; assert(bq); + pa_memblockq_flush(bq); + if (bq->mcalign) pa_mcalign_free(bq->mcalign); - while ((l = bq->blocks)) { - bq->blocks = l->next; - pa_memblock_unref(l->chunk.memblock); - pa_xfree(l); - } - pa_xfree(bq); } @@ -110,31 +100,25 @@ void pa_memblockq_push(struct pa_memblockq* bq, const struct pa_memchunk *chunk, struct memblock_list *q; assert(bq && chunk && chunk->memblock && chunk->length && (chunk->length % bq->base) == 0); + pa_memblockq_seek(bq, delta); + if (bq->blocks_tail && bq->blocks_tail->chunk.memblock == chunk->memblock) { /* Try to merge memory chunks */ if (bq->blocks_tail->chunk.index+bq->blocks_tail->chunk.length == chunk->index) { bq->blocks_tail->chunk.length += chunk->length; bq->current_length += chunk->length; - - /* fprintf(stderr, __FILE__": merge succeeded: %u\n", chunk->length);*/ return; } } q = pa_xmalloc(sizeof(struct memblock_list)); - if (bq->measure_delay) - gettimeofday(&q->stamp, NULL); - else - timerclear(&q->stamp); - q->chunk = *chunk; pa_memblock_ref(q->chunk.memblock); assert(q->chunk.index+q->chunk.length <= q->chunk.memblock->length); q->next = NULL; - - if (bq->blocks_tail) + if ((q->prev = bq->blocks_tail)) bq->blocks_tail->next = q; else bq->blocks = q; @@ -158,57 +142,43 @@ int pa_memblockq_peek(struct pa_memblockq* bq, struct pa_memchunk *chunk) { *chunk = bq->blocks->chunk; pa_memblock_ref(chunk->memblock); -/* if (chunk->memblock->ref != 2) */ -/* fprintf(stderr, "block %p with ref %u peeked.\n", chunk->memblock, chunk->memblock->ref); */ - return 0; } -/* -int memblockq_pop(struct memblockq* bq, struct pa_memchunk *chunk) { - struct memblock_list *q; - - assert(bq && chunk); - - if (!bq->blocks || bq->current_length < bq->prebuf) - return -1; - - bq->prebuf = 0; - - q = bq->blocks; - bq->blocks = bq->blocks->next; - - *chunk = q->chunk; +void pa_memblockq_drop(struct pa_memblockq *bq, const struct pa_memchunk *chunk, size_t length) { + assert(bq && chunk && length); - bq->n_blocks--; - bq->current_length -= chunk->length; + if (!bq->blocks || memcmp(&bq->blocks->chunk, chunk, sizeof(struct pa_memchunk))) + return; - pa_xfree(q); - return 0; + assert(length <= bq->blocks->chunk.length); + pa_memblockq_skip(bq, length); } -*/ -static uint32_t age(struct timeval *tv) { - struct timeval now; - uint32_t r; - assert(tv); +static void remove_block(struct pa_memblockq *bq, struct memblock_list *q) { + assert(bq && q); - if (tv->tv_sec == 0) - return 0; - - gettimeofday(&now, NULL); + if (q->prev) + q->prev->next = q->next; + else { + assert(bq->blocks == q); + bq->blocks = q->next; + } - r = (now.tv_sec-tv->tv_sec) * 1000000; - - if (now.tv_usec >= tv->tv_usec) - r += now.tv_usec - tv->tv_usec; - else - r -= tv->tv_usec - now.tv_usec; - - return r; + if (q->next) + q->next->prev = q->prev; + else { + assert(bq->blocks_tail == q); + bq->blocks_tail = q->prev; + } + + pa_memblock_unref(q->chunk.memblock); + pa_xfree(q); + + bq->n_blocks--; } -void pa_memblockq_drop(struct pa_memblockq *bq, size_t length) { +void pa_memblockq_skip(struct pa_memblockq *bq, size_t length) { assert(bq && length && (length % bq->base) == 0); while (length > 0) { @@ -218,25 +188,12 @@ void pa_memblockq_drop(struct pa_memblockq *bq, size_t length) { if (l > bq->blocks->chunk.length) l = bq->blocks->chunk.length; - if (bq->measure_delay) - bq->delay = age(&bq->blocks->stamp); - bq->blocks->chunk.index += l; bq->blocks->chunk.length -= l; bq->current_length -= l; - if (bq->blocks->chunk.length == 0) { - struct memblock_list *q; - - q = bq->blocks; - bq->blocks = bq->blocks->next; - if (bq->blocks == NULL) - bq->blocks_tail = NULL; - pa_memblock_unref(q->chunk.memblock); - pa_xfree(q); - - bq->n_blocks--; - } + if (!bq->blocks->chunk.length) + remove_block(bq, bq->blocks); length -= l; } @@ -255,7 +212,7 @@ void pa_memblockq_shorten(struct pa_memblockq *bq, size_t length) { l /= bq->base; l *= bq->base; - pa_memblockq_drop(bq, l); + pa_memblockq_skip(bq, l); } @@ -276,11 +233,6 @@ int pa_memblockq_is_writable(struct pa_memblockq *bq, size_t length) { return bq->current_length + length <= bq->tlength; } -uint32_t pa_memblockq_get_delay(struct pa_memblockq *bq) { - assert(bq); - return bq->delay; -} - uint32_t pa_memblockq_get_length(struct pa_memblockq *bq) { assert(bq); return bq->current_length; @@ -331,3 +283,44 @@ void pa_memblockq_prebuf_disable(struct pa_memblockq *bq) { assert(bq); bq->prebuf = 0; } + +void pa_memblockq_seek(struct pa_memblockq *bq, size_t length) { + assert(bq); + + if (!length) + return; + + while (length >= bq->base) { + size_t l = length; + if (!bq->current_length) + return; + + assert(bq->blocks_tail); + + if (l > bq->blocks_tail->chunk.length) + l = bq->blocks_tail->chunk.length; + + bq->blocks_tail->chunk.length -= l; + bq->current_length -= l; + + if (bq->blocks_tail->chunk.length == 0) + remove_block(bq, bq->blocks); + + length -= l; + } +} + +void pa_memblockq_flush(struct pa_memblockq *bq) { + struct memblock_list *l; + assert(bq); + + while ((l = bq->blocks)) { + bq->blocks = l->next; + pa_memblock_unref(l->chunk.memblock); + pa_xfree(l); + } + + bq->blocks_tail = NULL; + bq->n_blocks = 0; + bq->current_length = 0; +} diff --git a/polyp/memblockq.h b/polyp/memblockq.h index af8fa374..277beb55 100644 --- a/polyp/memblockq.h +++ b/polyp/memblockq.h @@ -44,7 +44,7 @@ struct pa_memblockq* pa_memblockq_new(size_t maxlength, struct pa_memblock_stat *s); void pa_memblockq_free(struct pa_memblockq*bq); -/* Push a new memory chunk into the queue. Optionally specify a value for future cancellation. This is currently not implemented, however! */ +/* Push a new memory chunk into the queue. Optionally specify a value for future cancellation. */ void pa_memblockq_push(struct pa_memblockq* bq, const struct pa_memchunk *chunk, size_t delta); /* Same as pa_memblockq_push(), however chunks are filtered through a mcalign object, and thus aligned to multiples of base */ @@ -53,8 +53,11 @@ void pa_memblockq_push_align(struct pa_memblockq* bq, const struct pa_memchunk * /* Return a copy of the next memory chunk in the queue. It is not removed from the queue */ int pa_memblockq_peek(struct pa_memblockq* bq, struct pa_memchunk *chunk); +/* Drop the specified bytes from the queue, only valid aufter pa_memblockq_peek() */ +void pa_memblockq_drop(struct pa_memblockq *bq, const struct pa_memchunk *chunk, size_t length); + /* Drop the specified bytes from the queue */ -void pa_memblockq_drop(struct pa_memblockq *bq, size_t length); +void pa_memblockq_skip(struct pa_memblockq *bq, size_t length); /* Shorten the pa_memblockq to the specified length by dropping data at the end of the queue */ void pa_memblockq_shorten(struct pa_memblockq *bq, size_t length); @@ -68,9 +71,6 @@ int pa_memblockq_is_readable(struct pa_memblockq *bq); /* Test if the pa_memblockq is currently writable for the specified amount of bytes */ int pa_memblockq_is_writable(struct pa_memblockq *bq, size_t length); -/* The time memory chunks stay in the queue until they are removed completely in usecs */ -uint32_t pa_memblockq_get_delay(struct pa_memblockq *bq); - /* Return the length of the queue in bytes */ uint32_t pa_memblockq_get_length(struct pa_memblockq *bq); @@ -83,4 +83,10 @@ uint32_t pa_memblockq_get_minreq(struct pa_memblockq *bq); /* Force disabling of pre-buf even when the pre-buffer is not yet filled */ void pa_memblockq_prebuf_disable(struct pa_memblockq *bq); +/* Manipulate the write pointer */ +void pa_memblockq_seek(struct pa_memblockq *bq, size_t delta); + +/* Flush the queue */ +void pa_memblockq_flush(struct pa_memblockq *bq); + #endif diff --git a/polyp/native-common.h b/polyp/native-common.h index b921ccc2..d826837a 100644 --- a/polyp/native-common.h +++ b/polyp/native-common.h @@ -75,6 +75,9 @@ enum { PA_COMMAND_SET_SINK_VOLUME, PA_COMMAND_SET_SINK_INPUT_VOLUME, + + PA_COMMAND_CORK_PLAYBACK_STREAM, + PA_COMMAND_FLUSH_PLAYBACK_STREAM, PA_COMMAND_MAX }; diff --git a/polyp/pacat.c b/polyp/pacat.c index 2c7044b8..198776d3 100644 --- a/polyp/pacat.c +++ b/polyp/pacat.c @@ -65,7 +65,7 @@ static void do_stream_write(size_t length) { if (l > buffer_length) l = buffer_length; - pa_stream_write(stream, buffer+buffer_index, l, NULL); + pa_stream_write(stream, buffer+buffer_index, l, NULL, 0); buffer_length -= l; buffer_index += l; diff --git a/polyp/pactl.c b/polyp/pactl.c index f2556706..dfa11b70 100644 --- a/polyp/pactl.c +++ b/polyp/pactl.c @@ -150,7 +150,7 @@ static void stream_write_callback(struct pa_stream *s, size_t length, void *user quit(1); } - pa_stream_write(s, d, length, free); + pa_stream_write(s, d, length, free, 0); sample_length -= length; diff --git a/polyp/play-memchunk.c b/polyp/play-memchunk.c index 5c423567..b94a0524 100644 --- a/polyp/play-memchunk.c +++ b/polyp/play-memchunk.c @@ -59,11 +59,12 @@ static void si_kill(struct pa_mainloop_api *m, void *i) { sink_input_kill(i); } -static void sink_input_drop(struct pa_sink_input *i, size_t length) { +static void sink_input_drop(struct pa_sink_input *i, const struct pa_memchunk*chunk, size_t length) { struct pa_memchunk *c; assert(i && length && i->userdata); c = i->userdata; + assert(chunk == c); assert(length <= c->length); c->length -= length; diff --git a/polyp/polyplib-context.c b/polyp/polyplib-context.c index d048cda9..9acb2d70 100644 --- a/polyp/polyplib-context.c +++ b/polyp/polyplib-context.c @@ -193,7 +193,7 @@ static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *pack pa_context_unref(c); } -static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata) { +static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata) { struct pa_context *c = userdata; struct pa_stream *s; assert(p && chunk && c && chunk->memblock && chunk->memblock->data); diff --git a/polyp/polyplib-simple.c b/polyp/polyplib-simple.c index 66ee5995..c71d59a4 100644 --- a/polyp/polyplib-simple.c +++ b/polyp/polyplib-simple.c @@ -187,7 +187,7 @@ int pa_simple_write(struct pa_simple *p, const void*data, size_t length, int *pe if (l > length) l = length; - pa_stream_write(p->stream, data, l, NULL); + pa_stream_write(p->stream, data, l, NULL, 0); data += l; length -= l; } diff --git a/polyp/polyplib-stream.c b/polyp/polyplib-stream.c index 451dd046..c0ec9e7e 100644 --- a/polyp/polyplib-stream.c +++ b/polyp/polyplib-stream.c @@ -267,7 +267,7 @@ void pa_stream_connect_record(struct pa_stream *s, const char *dev, const struct create_stream(s, dev, attr); } -void pa_stream_write(struct pa_stream *s, const void *data, size_t length, void (*free_cb)(void *p)) { +void pa_stream_write(struct pa_stream *s, const void *data, size_t length, void (*free_cb)(void *p), size_t delta) { struct pa_memchunk chunk; assert(s && s->context && data && length && s->state == PA_STREAM_READY && s->ref >= 1); @@ -282,7 +282,7 @@ void pa_stream_write(struct pa_stream *s, const void *data, size_t length, void chunk.index = 0; chunk.length = length; - pa_pstream_send_memblock(s->context->pstream, s->channel, 0, &chunk); + pa_pstream_send_memblock(s->context->pstream, s->channel, delta, &chunk); pa_memblock_unref(chunk.memblock); if (length < s->requested_bytes) @@ -452,3 +452,48 @@ finish: pa_operation_done(o); pa_operation_unref(o); } + +struct pa_operation* pa_stream_cork(struct pa_stream *s, int b, void (*cb) (struct pa_stream*s, int success, void *userdata), void *userdata) { + struct pa_operation *o; + struct pa_tagstruct *t; + uint32_t tag; + assert(s && s->ref >= 1 && s->state == PA_STREAM_READY); + + o = pa_operation_new(s->context, s); + assert(o); + o->callback = cb; + o->userdata = userdata; + + t = pa_tagstruct_new(NULL, 0); + assert(t); + pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM); + pa_tagstruct_putu32(t, tag = s->context->ctag++); + pa_tagstruct_putu32(t, s->channel); + pa_tagstruct_putu32(t, !!b); + pa_pstream_send_tagstruct(s->context->pstream, t); + pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o); + + return pa_operation_ref(o); +} + +struct pa_operation* pa_stream_flush(struct pa_stream *s, void (*cb)(struct pa_stream *s, int success, void *userdata), void *userdata) { + struct pa_operation *o; + struct pa_tagstruct *t; + uint32_t tag; + assert(s && s->ref >= 1 && s->state == PA_STREAM_READY); + + o = pa_operation_new(s->context, s); + assert(o); + o->callback = cb; + o->userdata = userdata; + + t = pa_tagstruct_new(NULL, 0); + assert(t); + pa_tagstruct_putu32(t, PA_COMMAND_FLUSH_PLAYBACK_STREAM); + pa_tagstruct_putu32(t, tag = s->context->ctag++); + pa_tagstruct_putu32(t, s->channel); + pa_pstream_send_tagstruct(s->context->pstream, t); + pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o); + + return pa_operation_ref(o); +} diff --git a/polyp/polyplib-stream.h b/polyp/polyplib-stream.h index 1a9d58dd..ff313326 100644 --- a/polyp/polyplib-stream.h +++ b/polyp/polyplib-stream.h @@ -70,7 +70,30 @@ void pa_stream_disconnect(struct pa_stream *s); * and an internal reference to the specified data is kept, the data * is not copied. If NULL, the data is copied into an internal * buffer. */ -void pa_stream_write(struct pa_stream *p, const void *data, size_t length, void (*free_cb)(void *p)); +void pa_stream_write(struct pa_stream *p /**< The stream to use */, + const void *data /**< The data to write */, + size_t length /**< The length of the data to write */, + void (*free_cb)(void *p) /**< A cleanup routine for the data or NULL to request an internal copy */, + size_t delta /**< Drop this many + bytes in the playback + buffer before writing + this data. Use + (size_t) -1 for + clearing the whole + playback + buffer. Normally you + will specify 0 here, + .i.e. append to the + playback buffer. If + the value given here + is greater than the + buffered data length + the buffer is cleared + and the data is + written to the + buffer's start. This + value is ignored on + upload streams. */); /** Return the amount of bytes that may be written using pa_stream_write() */ size_t pa_stream_writable_size(struct pa_stream *p); @@ -90,6 +113,16 @@ void pa_stream_set_write_callback(struct pa_stream *p, void (*cb)(struct pa_stre /** Set the callback function that is called when new data is available from the stream */ void pa_stream_set_read_callback(struct pa_stream *p, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata); +/** Pause (or resume) playback of this stream temporarily + * \since 0.3 */ +struct pa_operation* pa_stream_cork(struct pa_stream *s, int b, void (*cb) (struct pa_stream*s, int success, void *userdata), void *userdata); + +/** Flush the playback buffer of this stream. Most of the time you're + * better off using the delta of pa_stream_write() instead of this + * function. + * \since 0.3*/ +struct pa_operation* pa_stream_flush(struct pa_stream *s, void (*cb)(struct pa_stream *s, int success, void *userdata), void *userdata); + PA_C_DECL_END #endif diff --git a/polyp/protocol-esound.c b/polyp/protocol-esound.c index be2ef2b9..5102540b 100644 --- a/polyp/protocol-esound.c +++ b/polyp/protocol-esound.c @@ -102,7 +102,7 @@ typedef struct proto_handler { const char *description; } esd_proto_handler_info_t; -static void sink_input_drop_cb(struct pa_sink_input *i, size_t length); +static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length); static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk); static void sink_input_kill_cb(struct pa_sink_input *i); static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i); @@ -835,7 +835,7 @@ static int do_write(struct connection *c) { return -1; } - pa_memblockq_drop(c->output_memblockq, r); + pa_memblockq_drop(c->output_memblockq, &chunk, r); pa_memblock_unref(chunk.memblock); } @@ -894,11 +894,11 @@ static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk return 0; } -static void sink_input_drop_cb(struct pa_sink_input *i, size_t length) { +static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) { struct connection*c = i->userdata; assert(i && c && length); - pa_memblockq_drop(c->input_memblockq, length); + pa_memblockq_drop(c->input_memblockq, chunk, length); /* do something */ assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); diff --git a/polyp/protocol-native.c b/polyp/protocol-native.c index c8e5137e..8b39482c 100644 --- a/polyp/protocol-native.c +++ b/polyp/protocol-native.c @@ -107,7 +107,7 @@ struct pa_protocol_native { }; static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk); -static void sink_input_drop_cb(struct pa_sink_input *i, size_t length); +static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length); static void sink_input_kill_cb(struct pa_sink_input *i); static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i); @@ -135,6 +135,8 @@ static void command_get_info_list(struct pa_pdispatch *pd, uint32_t command, uin static void command_get_server_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); static void command_subscribe(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); static void command_set_volume(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); +static void command_cork_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); +static void command_flush_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = { [PA_COMMAND_ERROR] = { NULL }, @@ -176,6 +178,8 @@ static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = { [PA_COMMAND_SUBSCRIBE] = { command_subscribe }, [PA_COMMAND_SET_SINK_VOLUME] = { command_set_volume }, [PA_COMMAND_SET_SINK_INPUT_VOLUME] = { command_set_volume }, + [PA_COMMAND_CORK_PLAYBACK_STREAM] = { command_cork_playback_stream }, + [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = { command_flush_playback_stream }, }; /* structure management */ @@ -376,7 +380,7 @@ static void send_memblock(struct connection *c) { chunk.length = r->fragment_size; pa_pstream_send_memblock(c->pstream, r->index, 0, &chunk); - pa_memblockq_drop(r->memblockq, chunk.length); + pa_memblockq_drop(r->memblockq, &chunk, chunk.length); pa_memblock_unref(chunk.memblock); return; @@ -422,12 +426,12 @@ static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk return 0; } -static void sink_input_drop_cb(struct pa_sink_input *i, size_t length) { +static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) { struct playback_stream *s; assert(i && i->userdata && length); s = i->userdata; - pa_memblockq_drop(s->memblockq, length); + pa_memblockq_drop(s->memblockq, chunk, length); request_bytes(s); if (s->drain_request && !pa_memblockq_is_readable(s->memblockq)) { @@ -1293,6 +1297,59 @@ static void command_set_volume(struct pa_pdispatch *pd, uint32_t command, uint32 pa_pstream_send_simple_ack(c->pstream, tag); } +static void command_cork_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { + struct connection *c = userdata; + uint32_t index; + uint32_t b; + struct playback_stream *s; + assert(c && t); + + if (pa_tagstruct_getu32(t, &index) < 0 || + pa_tagstruct_getu32(t, &b) < 0 || + !pa_tagstruct_eof(t)) { + protocol_error(c); + return; + } + + if (!c->authorized) { + pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS); + return; + } + + if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) { + pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY); + return; + } + + pa_sink_input_cork(s->sink_input, b); + pa_pstream_send_simple_ack(c->pstream, tag); +} + +static void command_flush_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { + struct connection *c = userdata; + uint32_t index; + struct playback_stream *s; + assert(c && t); + + if (pa_tagstruct_getu32(t, &index) < 0 || + !pa_tagstruct_eof(t)) { + protocol_error(c); + return; + } + + if (!c->authorized) { + pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS); + return; + } + + if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) { + pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY); + return; + } + + pa_memblockq_flush(s->memblockq); + pa_pstream_send_simple_ack(c->pstream, tag); +} /*** pstream callbacks ***/ @@ -1306,7 +1363,7 @@ static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *pack } } -static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata) { +static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata) { struct connection *c = userdata; struct output_stream *stream; assert(p && chunk && userdata); @@ -1338,7 +1395,6 @@ static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, in u->memchunk = *chunk; pa_memblock_ref(u->memchunk.memblock); u->length = 0; - fprintf(stderr, "COPY\n"); } else { u->memchunk.memblock = pa_memblock_new(u->length, c->protocol->core->memblock_stat); u->memchunk.index = u->memchunk.length = 0; diff --git a/polyp/protocol-simple.c b/polyp/protocol-simple.c index 58343486..b03c2e54 100644 --- a/polyp/protocol-simple.c +++ b/polyp/protocol-simple.c @@ -159,7 +159,7 @@ static int do_write(struct connection *c) { return -1; } - pa_memblockq_drop(c->output_memblockq, r); + pa_memblockq_drop(c->output_memblockq, &chunk, r); pa_memblock_unref(chunk.memblock); return 0; @@ -202,11 +202,11 @@ static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk return 0; } -static void sink_input_drop_cb(struct pa_sink_input *i, size_t length) { +static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) { struct connection*c = i->userdata; assert(i && c && length); - pa_memblockq_drop(c->input_memblockq, length); + pa_memblockq_drop(c->input_memblockq, chunk, length); /* do something */ assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); diff --git a/polyp/pstream.c b/polyp/pstream.c index 5664e18a..ad3dd0e0 100644 --- a/polyp/pstream.c +++ b/polyp/pstream.c @@ -50,7 +50,7 @@ struct item_info { /* memblock info */ struct pa_memchunk chunk; uint32_t channel; - int32_t delta; + uint32_t delta; /* packet info */ struct pa_packet *packet; @@ -86,7 +86,7 @@ struct pa_pstream { void (*recieve_packet_callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata); void *recieve_packet_callback_userdata; - void (*recieve_memblock_callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata); + void (*recieve_memblock_callback) (struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata); void *recieve_memblock_callback_userdata; void (*drain_callback)(struct pa_pstream *p, void *userdata); @@ -219,7 +219,7 @@ void pa_pstream_send_packet(struct pa_pstream*p, struct pa_packet *packet) { p->mainloop->defer_enable(p->defer_event, 1); } -void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk) { +void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk) { struct item_info *i; assert(p && channel != (uint32_t) -1 && chunk); @@ -242,7 +242,7 @@ void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, void (*callbac p->recieve_packet_callback_userdata = userdata; } -void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata), void *userdata) { +void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata), void *userdata) { assert(p && callback); p->recieve_memblock_callback = callback; @@ -378,7 +378,7 @@ static void do_read(struct pa_pstream *p) { p->recieve_memblock_callback( p, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), - (int32_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA]), + ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA]), &chunk, p->recieve_memblock_callback_userdata); } diff --git a/polyp/pstream.h b/polyp/pstream.h index 9a289507..dfd29983 100644 --- a/polyp/pstream.h +++ b/polyp/pstream.h @@ -37,10 +37,10 @@ void pa_pstream_unref(struct pa_pstream*p); struct pa_pstream* pa_pstream_ref(struct pa_pstream*p); void pa_pstream_send_packet(struct pa_pstream*p, struct pa_packet *packet); -void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk); +void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk); void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata), void *userdata); -void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata), void *userdata); +void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata), void *userdata); void pa_pstream_set_drain_callback(struct pa_pstream *p, void (*cb)(struct pa_pstream *p, void *userdata), void *userdata); void pa_pstream_set_die_callback(struct pa_pstream *p, void (*callback)(struct pa_pstream *p, void *userdata), void *userdata); diff --git a/polyp/sink-input.c b/polyp/sink-input.c index c57dd8e0..5009033f 100644 --- a/polyp/sink-input.c +++ b/polyp/sink-input.c @@ -59,6 +59,7 @@ struct pa_sink_input* pa_sink_input_new(struct pa_sink *s, const char *name, con i->get_latency = NULL; i->userdata = NULL; + i->corked = 0; i->volume = PA_VOLUME_NORM; i->resampled_chunk.memblock = NULL; @@ -120,6 +121,9 @@ uint32_t pa_sink_input_get_latency(struct pa_sink_input *i) { int pa_sink_input_peek(struct pa_sink_input *i, struct pa_memchunk *chunk) { assert(i && chunk && i->peek && i->drop); + if (i->corked == 0) + return -1; + if (!i->resampler) return i->peek(i, chunk); @@ -134,11 +138,12 @@ int pa_sink_input_peek(struct pa_sink_input *i, struct pa_memchunk *chunk) { assert(tchunk.length); l = pa_resampler_request(i->resampler, CONVERT_BUFFER_LENGTH); + + i->drop(i, &tchunk, l); + if (tchunk.length > l) tchunk.length = l; - i->drop(i, tchunk.length); - pa_resampler_run(i->resampler, &tchunk, &i->resampled_chunk); pa_memblock_unref(tchunk.memblock); } @@ -149,11 +154,11 @@ int pa_sink_input_peek(struct pa_sink_input *i, struct pa_memchunk *chunk) { return 0; } -void pa_sink_input_drop(struct pa_sink_input *i, size_t length) { +void pa_sink_input_drop(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) { assert(i && length); if (!i->resampler) { - i->drop(i, length); + i->drop(i, chunk, length); return; } @@ -177,3 +182,13 @@ void pa_sink_input_set_volume(struct pa_sink_input *i, pa_volume_t volume) { pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index); } } + +void pa_sink_input_cork(struct pa_sink_input *i, int b) { + int n; + assert(i); + n = i->corked && !b; + i->corked = b; + + if (n) + pa_sink_notify(i->sink); +} diff --git a/polyp/sink-input.h b/polyp/sink-input.h index 8d7788d8..b0644540 100644 --- a/polyp/sink-input.h +++ b/polyp/sink-input.h @@ -34,6 +34,8 @@ struct pa_sink_input { uint32_t index; + int corked; + char *name; struct pa_module *owner; struct pa_client *client; @@ -42,7 +44,7 @@ struct pa_sink_input { uint32_t volume; int (*peek) (struct pa_sink_input *i, struct pa_memchunk *chunk); - void (*drop) (struct pa_sink_input *i, size_t length); + void (*drop) (struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length); void (*kill) (struct pa_sink_input *i); uint32_t (*get_latency) (struct pa_sink_input *i); @@ -62,8 +64,10 @@ void pa_sink_input_kill(struct pa_sink_input *i); uint32_t pa_sink_input_get_latency(struct pa_sink_input *i); int pa_sink_input_peek(struct pa_sink_input *i, struct pa_memchunk *chunk); -void pa_sink_input_drop(struct pa_sink_input *i, size_t length); +void pa_sink_input_drop(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length); void pa_sink_input_set_volume(struct pa_sink_input *i, pa_volume_t volume); +void pa_sink_input_cork(struct pa_sink_input *i, int b); + #endif diff --git a/polyp/sink.c b/polyp/sink.c index 62b9a7af..43fd351c 100644 --- a/polyp/sink.c +++ b/polyp/sink.c @@ -147,8 +147,8 @@ static void inputs_drop(struct pa_sink *s, struct pa_mix_info *info, unsigned ma struct pa_sink_input *i = info->userdata; assert(i && info->chunk.memblock); + pa_sink_input_drop(i, &info->chunk, length); pa_memblock_unref(info->chunk.memblock); - pa_sink_input_drop(i, length); } } diff --git a/polyp/util.c b/polyp/util.c index 2878c546..6c8febb6 100644 --- a/polyp/util.c +++ b/polyp/util.c @@ -36,6 +36,7 @@ #include #include #include +#include #include "util.h" #include "xmalloc.h" @@ -192,3 +193,23 @@ char *pa_get_host_name(char *s, size_t l) { s[l-1] = 0; return s; } + +uint32_t pa_age(struct timeval *tv) { + struct timeval now; + uint32_t r; + assert(tv); + + if (tv->tv_sec == 0) + return 0; + + gettimeofday(&now, NULL); + + r = (now.tv_sec-tv->tv_sec) * 1000000; + + if (now.tv_usec >= tv->tv_usec) + r += now.tv_usec - tv->tv_usec; + else + r -= tv->tv_usec - now.tv_usec; + + return r; +} diff --git a/polyp/util.h b/polyp/util.h index 7dd7b7de..9dab45d2 100644 --- a/polyp/util.h +++ b/polyp/util.h @@ -23,6 +23,7 @@ ***/ #include +#include void pa_make_nonblock_fd(int fd); @@ -38,4 +39,6 @@ char *pa_sprintf_malloc(const char *format, ...) __attribute__ ((format (printf, char *pa_get_user_name(char *s, size_t l); char *pa_get_host_name(char *s, size_t l); +uint32_t pa_age(struct timeval *tv); + #endif -- cgit