summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLennart Poettering <lennart@poettering.net>2004-08-22 21:13:58 +0000
committerLennart Poettering <lennart@poettering.net>2004-08-22 21:13:58 +0000
commit41295bbf56ef6df0a0e705149475d91c8d83ff3f (patch)
treecd7d52d972d24fa3b6298d1a19a213ce25dcf44a
parentea4805a0fd4aea6db4c99e1187aca8e013e4dc24 (diff)
new features:
future cancellation corking flushing for playback streams in native protocol git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@152 fefdeb5f-60dc-0310-8127-8f9354f1896f
-rw-r--r--polyp/memblockq.c165
-rw-r--r--polyp/memblockq.h16
-rw-r--r--polyp/native-common.h3
-rw-r--r--polyp/pacat.c2
-rw-r--r--polyp/pactl.c2
-rw-r--r--polyp/play-memchunk.c3
-rw-r--r--polyp/polyplib-context.c2
-rw-r--r--polyp/polyplib-simple.c2
-rw-r--r--polyp/polyplib-stream.c49
-rw-r--r--polyp/polyplib-stream.h35
-rw-r--r--polyp/protocol-esound.c8
-rw-r--r--polyp/protocol-native.c68
-rw-r--r--polyp/protocol-simple.c6
-rw-r--r--polyp/pstream.c10
-rw-r--r--polyp/pstream.h4
-rw-r--r--polyp/sink-input.c23
-rw-r--r--polyp/sink-input.h8
-rw-r--r--polyp/sink.c2
-rw-r--r--polyp/util.c21
-rw-r--r--polyp/util.h3
20 files changed, 306 insertions, 126 deletions
diff --git a/polyp/memblockq.c b/polyp/memblockq.c
index 085c0510..b6dcca3f 100644
--- a/polyp/memblockq.c
+++ b/polyp/memblockq.c
@@ -28,22 +28,20 @@
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
+#include <string.h>
#include "memblockq.h"
#include "xmalloc.h"
struct memblock_list {
- struct memblock_list *next;
+ struct memblock_list *next, *prev;
struct pa_memchunk chunk;
- struct timeval stamp;
};
struct pa_memblockq {
struct memblock_list *blocks, *blocks_tail;
unsigned n_blocks;
size_t current_length, maxlength, tlength, base, prebuf, minreq;
- int measure_delay;
- uint32_t delay;
struct pa_mcalign *mcalign;
struct pa_memblock_stat *memblock_stat;
};
@@ -66,7 +64,7 @@ struct pa_memblockq* pa_memblockq_new(size_t maxlength, size_t tlength, size_t b
assert(bq->maxlength >= base);
bq->tlength = ((tlength+base-1)/base)*base;
- if (bq->tlength == 0 || bq->tlength >= bq->maxlength)
+ if (!bq->tlength || bq->tlength >= bq->maxlength)
bq->tlength = bq->maxlength;
bq->prebuf = (prebuf == (size_t) -1) ? bq->maxlength/2 : prebuf;
@@ -80,29 +78,21 @@ struct pa_memblockq* pa_memblockq_new(size_t maxlength, size_t tlength, size_t b
fprintf(stderr, "memblockq sanitized: maxlength=%u, tlength=%u, base=%u, prebuf=%u, minreq=%u\n", bq->maxlength, bq->tlength, bq->base, bq->prebuf, bq->minreq);
- bq->measure_delay = 0;
- bq->delay = 0;
-
bq->mcalign = NULL;
bq->memblock_stat = s;
-
+
return bq;
}
void pa_memblockq_free(struct pa_memblockq* bq) {
- struct memblock_list *l;
assert(bq);
+ pa_memblockq_flush(bq);
+
if (bq->mcalign)
pa_mcalign_free(bq->mcalign);
- while ((l = bq->blocks)) {
- bq->blocks = l->next;
- pa_memblock_unref(l->chunk.memblock);
- pa_xfree(l);
- }
-
pa_xfree(bq);
}
@@ -110,31 +100,25 @@ void pa_memblockq_push(struct pa_memblockq* bq, const struct pa_memchunk *chunk,
struct memblock_list *q;
assert(bq && chunk && chunk->memblock && chunk->length && (chunk->length % bq->base) == 0);
+ pa_memblockq_seek(bq, delta);
+
if (bq->blocks_tail && bq->blocks_tail->chunk.memblock == chunk->memblock) {
/* Try to merge memory chunks */
if (bq->blocks_tail->chunk.index+bq->blocks_tail->chunk.length == chunk->index) {
bq->blocks_tail->chunk.length += chunk->length;
bq->current_length += chunk->length;
-
- /* fprintf(stderr, __FILE__": merge succeeded: %u\n", chunk->length);*/
return;
}
}
q = pa_xmalloc(sizeof(struct memblock_list));
- if (bq->measure_delay)
- gettimeofday(&q->stamp, NULL);
- else
- timerclear(&q->stamp);
-
q->chunk = *chunk;
pa_memblock_ref(q->chunk.memblock);
assert(q->chunk.index+q->chunk.length <= q->chunk.memblock->length);
q->next = NULL;
-
- if (bq->blocks_tail)
+ if ((q->prev = bq->blocks_tail))
bq->blocks_tail->next = q;
else
bq->blocks = q;
@@ -158,57 +142,43 @@ int pa_memblockq_peek(struct pa_memblockq* bq, struct pa_memchunk *chunk) {
*chunk = bq->blocks->chunk;
pa_memblock_ref(chunk->memblock);
-/* if (chunk->memblock->ref != 2) */
-/* fprintf(stderr, "block %p with ref %u peeked.\n", chunk->memblock, chunk->memblock->ref); */
-
return 0;
}
-/*
-int memblockq_pop(struct memblockq* bq, struct pa_memchunk *chunk) {
- struct memblock_list *q;
-
- assert(bq && chunk);
-
- if (!bq->blocks || bq->current_length < bq->prebuf)
- return -1;
-
- bq->prebuf = 0;
-
- q = bq->blocks;
- bq->blocks = bq->blocks->next;
-
- *chunk = q->chunk;
+void pa_memblockq_drop(struct pa_memblockq *bq, const struct pa_memchunk *chunk, size_t length) {
+ assert(bq && chunk && length);
- bq->n_blocks--;
- bq->current_length -= chunk->length;
+ if (!bq->blocks || memcmp(&bq->blocks->chunk, chunk, sizeof(struct pa_memchunk)))
+ return;
- pa_xfree(q);
- return 0;
+ assert(length <= bq->blocks->chunk.length);
+ pa_memblockq_skip(bq, length);
}
-*/
-static uint32_t age(struct timeval *tv) {
- struct timeval now;
- uint32_t r;
- assert(tv);
+static void remove_block(struct pa_memblockq *bq, struct memblock_list *q) {
+ assert(bq && q);
- if (tv->tv_sec == 0)
- return 0;
-
- gettimeofday(&now, NULL);
+ if (q->prev)
+ q->prev->next = q->next;
+ else {
+ assert(bq->blocks == q);
+ bq->blocks = q->next;
+ }
- r = (now.tv_sec-tv->tv_sec) * 1000000;
-
- if (now.tv_usec >= tv->tv_usec)
- r += now.tv_usec - tv->tv_usec;
- else
- r -= tv->tv_usec - now.tv_usec;
-
- return r;
+ if (q->next)
+ q->next->prev = q->prev;
+ else {
+ assert(bq->blocks_tail == q);
+ bq->blocks_tail = q->prev;
+ }
+
+ pa_memblock_unref(q->chunk.memblock);
+ pa_xfree(q);
+
+ bq->n_blocks--;
}
-void pa_memblockq_drop(struct pa_memblockq *bq, size_t length) {
+void pa_memblockq_skip(struct pa_memblockq *bq, size_t length) {
assert(bq && length && (length % bq->base) == 0);
while (length > 0) {
@@ -218,25 +188,12 @@ void pa_memblockq_drop(struct pa_memblockq *bq, size_t length) {
if (l > bq->blocks->chunk.length)
l = bq->blocks->chunk.length;
- if (bq->measure_delay)
- bq->delay = age(&bq->blocks->stamp);
-
bq->blocks->chunk.index += l;
bq->blocks->chunk.length -= l;
bq->current_length -= l;
- if (bq->blocks->chunk.length == 0) {
- struct memblock_list *q;
-
- q = bq->blocks;
- bq->blocks = bq->blocks->next;
- if (bq->blocks == NULL)
- bq->blocks_tail = NULL;
- pa_memblock_unref(q->chunk.memblock);
- pa_xfree(q);
-
- bq->n_blocks--;
- }
+ if (!bq->blocks->chunk.length)
+ remove_block(bq, bq->blocks);
length -= l;
}
@@ -255,7 +212,7 @@ void pa_memblockq_shorten(struct pa_memblockq *bq, size_t length) {
l /= bq->base;
l *= bq->base;
- pa_memblockq_drop(bq, l);
+ pa_memblockq_skip(bq, l);
}
@@ -276,11 +233,6 @@ int pa_memblockq_is_writable(struct pa_memblockq *bq, size_t length) {
return bq->current_length + length <= bq->tlength;
}
-uint32_t pa_memblockq_get_delay(struct pa_memblockq *bq) {
- assert(bq);
- return bq->delay;
-}
-
uint32_t pa_memblockq_get_length(struct pa_memblockq *bq) {
assert(bq);
return bq->current_length;
@@ -331,3 +283,44 @@ void pa_memblockq_prebuf_disable(struct pa_memblockq *bq) {
assert(bq);
bq->prebuf = 0;
}
+
+void pa_memblockq_seek(struct pa_memblockq *bq, size_t length) {
+ assert(bq);
+
+ if (!length)
+ return;
+
+ while (length >= bq->base) {
+ size_t l = length;
+ if (!bq->current_length)
+ return;
+
+ assert(bq->blocks_tail);
+
+ if (l > bq->blocks_tail->chunk.length)
+ l = bq->blocks_tail->chunk.length;
+
+ bq->blocks_tail->chunk.length -= l;
+ bq->current_length -= l;
+
+ if (bq->blocks_tail->chunk.length == 0)
+ remove_block(bq, bq->blocks);
+
+ length -= l;
+ }
+}
+
+void pa_memblockq_flush(struct pa_memblockq *bq) {
+ struct memblock_list *l;
+ assert(bq);
+
+ while ((l = bq->blocks)) {
+ bq->blocks = l->next;
+ pa_memblock_unref(l->chunk.memblock);
+ pa_xfree(l);
+ }
+
+ bq->blocks_tail = NULL;
+ bq->n_blocks = 0;
+ bq->current_length = 0;
+}
diff --git a/polyp/memblockq.h b/polyp/memblockq.h
index af8fa374..277beb55 100644
--- a/polyp/memblockq.h
+++ b/polyp/memblockq.h
@@ -44,7 +44,7 @@ struct pa_memblockq* pa_memblockq_new(size_t maxlength,
struct pa_memblock_stat *s);
void pa_memblockq_free(struct pa_memblockq*bq);
-/* Push a new memory chunk into the queue. Optionally specify a value for future cancellation. This is currently not implemented, however! */
+/* Push a new memory chunk into the queue. Optionally specify a value for future cancellation. */
void pa_memblockq_push(struct pa_memblockq* bq, const struct pa_memchunk *chunk, size_t delta);
/* Same as pa_memblockq_push(), however chunks are filtered through a mcalign object, and thus aligned to multiples of base */
@@ -53,8 +53,11 @@ void pa_memblockq_push_align(struct pa_memblockq* bq, const struct pa_memchunk *
/* Return a copy of the next memory chunk in the queue. It is not removed from the queue */
int pa_memblockq_peek(struct pa_memblockq* bq, struct pa_memchunk *chunk);
+/* Drop the specified bytes from the queue, only valid aufter pa_memblockq_peek() */
+void pa_memblockq_drop(struct pa_memblockq *bq, const struct pa_memchunk *chunk, size_t length);
+
/* Drop the specified bytes from the queue */
-void pa_memblockq_drop(struct pa_memblockq *bq, size_t length);
+void pa_memblockq_skip(struct pa_memblockq *bq, size_t length);
/* Shorten the pa_memblockq to the specified length by dropping data at the end of the queue */
void pa_memblockq_shorten(struct pa_memblockq *bq, size_t length);
@@ -68,9 +71,6 @@ int pa_memblockq_is_readable(struct pa_memblockq *bq);
/* Test if the pa_memblockq is currently writable for the specified amount of bytes */
int pa_memblockq_is_writable(struct pa_memblockq *bq, size_t length);
-/* The time memory chunks stay in the queue until they are removed completely in usecs */
-uint32_t pa_memblockq_get_delay(struct pa_memblockq *bq);
-
/* Return the length of the queue in bytes */
uint32_t pa_memblockq_get_length(struct pa_memblockq *bq);
@@ -83,4 +83,10 @@ uint32_t pa_memblockq_get_minreq(struct pa_memblockq *bq);
/* Force disabling of pre-buf even when the pre-buffer is not yet filled */
void pa_memblockq_prebuf_disable(struct pa_memblockq *bq);
+/* Manipulate the write pointer */
+void pa_memblockq_seek(struct pa_memblockq *bq, size_t delta);
+
+/* Flush the queue */
+void pa_memblockq_flush(struct pa_memblockq *bq);
+
#endif
diff --git a/polyp/native-common.h b/polyp/native-common.h
index b921ccc2..d826837a 100644
--- a/polyp/native-common.h
+++ b/polyp/native-common.h
@@ -75,6 +75,9 @@ enum {
PA_COMMAND_SET_SINK_VOLUME,
PA_COMMAND_SET_SINK_INPUT_VOLUME,
+
+ PA_COMMAND_CORK_PLAYBACK_STREAM,
+ PA_COMMAND_FLUSH_PLAYBACK_STREAM,
PA_COMMAND_MAX
};
diff --git a/polyp/pacat.c b/polyp/pacat.c
index 2c7044b8..198776d3 100644
--- a/polyp/pacat.c
+++ b/polyp/pacat.c
@@ -65,7 +65,7 @@ static void do_stream_write(size_t length) {
if (l > buffer_length)
l = buffer_length;
- pa_stream_write(stream, buffer+buffer_index, l, NULL);
+ pa_stream_write(stream, buffer+buffer_index, l, NULL, 0);
buffer_length -= l;
buffer_index += l;
diff --git a/polyp/pactl.c b/polyp/pactl.c
index f2556706..dfa11b70 100644
--- a/polyp/pactl.c
+++ b/polyp/pactl.c
@@ -150,7 +150,7 @@ static void stream_write_callback(struct pa_stream *s, size_t length, void *user
quit(1);
}
- pa_stream_write(s, d, length, free);
+ pa_stream_write(s, d, length, free, 0);
sample_length -= length;
diff --git a/polyp/play-memchunk.c b/polyp/play-memchunk.c
index 5c423567..b94a0524 100644
--- a/polyp/play-memchunk.c
+++ b/polyp/play-memchunk.c
@@ -59,11 +59,12 @@ static void si_kill(struct pa_mainloop_api *m, void *i) {
sink_input_kill(i);
}
-static void sink_input_drop(struct pa_sink_input *i, size_t length) {
+static void sink_input_drop(struct pa_sink_input *i, const struct pa_memchunk*chunk, size_t length) {
struct pa_memchunk *c;
assert(i && length && i->userdata);
c = i->userdata;
+ assert(chunk == c);
assert(length <= c->length);
c->length -= length;
diff --git a/polyp/polyplib-context.c b/polyp/polyplib-context.c
index d048cda9..9acb2d70 100644
--- a/polyp/polyplib-context.c
+++ b/polyp/polyplib-context.c
@@ -193,7 +193,7 @@ static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *pack
pa_context_unref(c);
}
-static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata) {
+static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata) {
struct pa_context *c = userdata;
struct pa_stream *s;
assert(p && chunk && c && chunk->memblock && chunk->memblock->data);
diff --git a/polyp/polyplib-simple.c b/polyp/polyplib-simple.c
index 66ee5995..c71d59a4 100644
--- a/polyp/polyplib-simple.c
+++ b/polyp/polyplib-simple.c
@@ -187,7 +187,7 @@ int pa_simple_write(struct pa_simple *p, const void*data, size_t length, int *pe
if (l > length)
l = length;
- pa_stream_write(p->stream, data, l, NULL);
+ pa_stream_write(p->stream, data, l, NULL, 0);
data += l;
length -= l;
}
diff --git a/polyp/polyplib-stream.c b/polyp/polyplib-stream.c
index 451dd046..c0ec9e7e 100644
--- a/polyp/polyplib-stream.c
+++ b/polyp/polyplib-stream.c
@@ -267,7 +267,7 @@ void pa_stream_connect_record(struct pa_stream *s, const char *dev, const struct
create_stream(s, dev, attr);
}
-void pa_stream_write(struct pa_stream *s, const void *data, size_t length, void (*free_cb)(void *p)) {
+void pa_stream_write(struct pa_stream *s, const void *data, size_t length, void (*free_cb)(void *p), size_t delta) {
struct pa_memchunk chunk;
assert(s && s->context && data && length && s->state == PA_STREAM_READY && s->ref >= 1);
@@ -282,7 +282,7 @@ void pa_stream_write(struct pa_stream *s, const void *data, size_t length, void
chunk.index = 0;
chunk.length = length;
- pa_pstream_send_memblock(s->context->pstream, s->channel, 0, &chunk);
+ pa_pstream_send_memblock(s->context->pstream, s->channel, delta, &chunk);
pa_memblock_unref(chunk.memblock);
if (length < s->requested_bytes)
@@ -452,3 +452,48 @@ finish:
pa_operation_done(o);
pa_operation_unref(o);
}
+
+struct pa_operation* pa_stream_cork(struct pa_stream *s, int b, void (*cb) (struct pa_stream*s, int success, void *userdata), void *userdata) {
+ struct pa_operation *o;
+ struct pa_tagstruct *t;
+ uint32_t tag;
+ assert(s && s->ref >= 1 && s->state == PA_STREAM_READY);
+
+ o = pa_operation_new(s->context, s);
+ assert(o);
+ o->callback = cb;
+ o->userdata = userdata;
+
+ t = pa_tagstruct_new(NULL, 0);
+ assert(t);
+ pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
+ pa_tagstruct_putu32(t, tag = s->context->ctag++);
+ pa_tagstruct_putu32(t, s->channel);
+ pa_tagstruct_putu32(t, !!b);
+ pa_pstream_send_tagstruct(s->context->pstream, t);
+ pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o);
+
+ return pa_operation_ref(o);
+}
+
+struct pa_operation* pa_stream_flush(struct pa_stream *s, void (*cb)(struct pa_stream *s, int success, void *userdata), void *userdata) {
+ struct pa_operation *o;
+ struct pa_tagstruct *t;
+ uint32_t tag;
+ assert(s && s->ref >= 1 && s->state == PA_STREAM_READY);
+
+ o = pa_operation_new(s->context, s);
+ assert(o);
+ o->callback = cb;
+ o->userdata = userdata;
+
+ t = pa_tagstruct_new(NULL, 0);
+ assert(t);
+ pa_tagstruct_putu32(t, PA_COMMAND_FLUSH_PLAYBACK_STREAM);
+ pa_tagstruct_putu32(t, tag = s->context->ctag++);
+ pa_tagstruct_putu32(t, s->channel);
+ pa_pstream_send_tagstruct(s->context->pstream, t);
+ pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o);
+
+ return pa_operation_ref(o);
+}
diff --git a/polyp/polyplib-stream.h b/polyp/polyplib-stream.h
index 1a9d58dd..ff313326 100644
--- a/polyp/polyplib-stream.h
+++ b/polyp/polyplib-stream.h
@@ -70,7 +70,30 @@ void pa_stream_disconnect(struct pa_stream *s);
* and an internal reference to the specified data is kept, the data
* is not copied. If NULL, the data is copied into an internal
* buffer. */
-void pa_stream_write(struct pa_stream *p, const void *data, size_t length, void (*free_cb)(void *p));
+void pa_stream_write(struct pa_stream *p /**< The stream to use */,
+ const void *data /**< The data to write */,
+ size_t length /**< The length of the data to write */,
+ void (*free_cb)(void *p) /**< A cleanup routine for the data or NULL to request an internal copy */,
+ size_t delta /**< Drop this many
+ bytes in the playback
+ buffer before writing
+ this data. Use
+ (size_t) -1 for
+ clearing the whole
+ playback
+ buffer. Normally you
+ will specify 0 here,
+ .i.e. append to the
+ playback buffer. If
+ the value given here
+ is greater than the
+ buffered data length
+ the buffer is cleared
+ and the data is
+ written to the
+ buffer's start. This
+ value is ignored on
+ upload streams. */);
/** Return the amount of bytes that may be written using pa_stream_write() */
size_t pa_stream_writable_size(struct pa_stream *p);
@@ -90,6 +113,16 @@ void pa_stream_set_write_callback(struct pa_stream *p, void (*cb)(struct pa_stre
/** Set the callback function that is called when new data is available from the stream */
void pa_stream_set_read_callback(struct pa_stream *p, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata);
+/** Pause (or resume) playback of this stream temporarily
+ * \since 0.3 */
+struct pa_operation* pa_stream_cork(struct pa_stream *s, int b, void (*cb) (struct pa_stream*s, int success, void *userdata), void *userdata);
+
+/** Flush the playback buffer of this stream. Most of the time you're
+ * better off using the delta of pa_stream_write() instead of this
+ * function.
+ * \since 0.3*/
+struct pa_operation* pa_stream_flush(struct pa_stream *s, void (*cb)(struct pa_stream *s, int success, void *userdata), void *userdata);
+
PA_C_DECL_END
#endif
diff --git a/polyp/protocol-esound.c b/polyp/protocol-esound.c
index be2ef2b9..5102540b 100644
--- a/polyp/protocol-esound.c
+++ b/polyp/protocol-esound.c
@@ -102,7 +102,7 @@ typedef struct proto_handler {
const char *description;
} esd_proto_handler_info_t;
-static void sink_input_drop_cb(struct pa_sink_input *i, size_t length);
+static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length);
static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk);
static void sink_input_kill_cb(struct pa_sink_input *i);
static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i);
@@ -835,7 +835,7 @@ static int do_write(struct connection *c) {
return -1;
}
- pa_memblockq_drop(c->output_memblockq, r);
+ pa_memblockq_drop(c->output_memblockq, &chunk, r);
pa_memblock_unref(chunk.memblock);
}
@@ -894,11 +894,11 @@ static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk
return 0;
}
-static void sink_input_drop_cb(struct pa_sink_input *i, size_t length) {
+static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) {
struct connection*c = i->userdata;
assert(i && c && length);
- pa_memblockq_drop(c->input_memblockq, length);
+ pa_memblockq_drop(c->input_memblockq, chunk, length);
/* do something */
assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
diff --git a/polyp/protocol-native.c b/polyp/protocol-native.c
index c8e5137e..8b39482c 100644
--- a/polyp/protocol-native.c
+++ b/polyp/protocol-native.c
@@ -107,7 +107,7 @@ struct pa_protocol_native {
};
static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk);
-static void sink_input_drop_cb(struct pa_sink_input *i, size_t length);
+static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length);
static void sink_input_kill_cb(struct pa_sink_input *i);
static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i);
@@ -135,6 +135,8 @@ static void command_get_info_list(struct pa_pdispatch *pd, uint32_t command, uin
static void command_get_server_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
static void command_subscribe(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
static void command_set_volume(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
+static void command_cork_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
+static void command_flush_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
[PA_COMMAND_ERROR] = { NULL },
@@ -176,6 +178,8 @@ static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
[PA_COMMAND_SUBSCRIBE] = { command_subscribe },
[PA_COMMAND_SET_SINK_VOLUME] = { command_set_volume },
[PA_COMMAND_SET_SINK_INPUT_VOLUME] = { command_set_volume },
+ [PA_COMMAND_CORK_PLAYBACK_STREAM] = { command_cork_playback_stream },
+ [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = { command_flush_playback_stream },
};
/* structure management */
@@ -376,7 +380,7 @@ static void send_memblock(struct connection *c) {
chunk.length = r->fragment_size;
pa_pstream_send_memblock(c->pstream, r->index, 0, &chunk);
- pa_memblockq_drop(r->memblockq, chunk.length);
+ pa_memblockq_drop(r->memblockq, &chunk, chunk.length);
pa_memblock_unref(chunk.memblock);
return;
@@ -422,12 +426,12 @@ static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk
return 0;
}
-static void sink_input_drop_cb(struct pa_sink_input *i, size_t length) {
+static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) {
struct playback_stream *s;
assert(i && i->userdata && length);
s = i->userdata;
- pa_memblockq_drop(s->memblockq, length);
+ pa_memblockq_drop(s->memblockq, chunk, length);
request_bytes(s);
if (s->drain_request && !pa_memblockq_is_readable(s->memblockq)) {
@@ -1293,6 +1297,59 @@ static void command_set_volume(struct pa_pdispatch *pd, uint32_t command, uint32
pa_pstream_send_simple_ack(c->pstream, tag);
}
+static void command_cork_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
+ struct connection *c = userdata;
+ uint32_t index;
+ uint32_t b;
+ struct playback_stream *s;
+ assert(c && t);
+
+ if (pa_tagstruct_getu32(t, &index) < 0 ||
+ pa_tagstruct_getu32(t, &b) < 0 ||
+ !pa_tagstruct_eof(t)) {
+ protocol_error(c);
+ return;
+ }
+
+ if (!c->authorized) {
+ pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ return;
+ }
+
+ if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
+ pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ return;
+ }
+
+ pa_sink_input_cork(s->sink_input, b);
+ pa_pstream_send_simple_ack(c->pstream, tag);
+}
+
+static void command_flush_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
+ struct connection *c = userdata;
+ uint32_t index;
+ struct playback_stream *s;
+ assert(c && t);
+
+ if (pa_tagstruct_getu32(t, &index) < 0 ||
+ !pa_tagstruct_eof(t)) {
+ protocol_error(c);
+ return;
+ }
+
+ if (!c->authorized) {
+ pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ return;
+ }
+
+ if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
+ pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ return;
+ }
+
+ pa_memblockq_flush(s->memblockq);
+ pa_pstream_send_simple_ack(c->pstream, tag);
+}
/*** pstream callbacks ***/
@@ -1306,7 +1363,7 @@ static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *pack
}
}
-static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata) {
+static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata) {
struct connection *c = userdata;
struct output_stream *stream;
assert(p && chunk && userdata);
@@ -1338,7 +1395,6 @@ static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, in
u->memchunk = *chunk;
pa_memblock_ref(u->memchunk.memblock);
u->length = 0;
- fprintf(stderr, "COPY\n");
} else {
u->memchunk.memblock = pa_memblock_new(u->length, c->protocol->core->memblock_stat);
u->memchunk.index = u->memchunk.length = 0;
diff --git a/polyp/protocol-simple.c b/polyp/protocol-simple.c
index 58343486..b03c2e54 100644
--- a/polyp/protocol-simple.c
+++ b/polyp/protocol-simple.c
@@ -159,7 +159,7 @@ static int do_write(struct connection *c) {
return -1;
}
- pa_memblockq_drop(c->output_memblockq, r);
+ pa_memblockq_drop(c->output_memblockq, &chunk, r);
pa_memblock_unref(chunk.memblock);
return 0;
@@ -202,11 +202,11 @@ static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk
return 0;
}
-static void sink_input_drop_cb(struct pa_sink_input *i, size_t length) {
+static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) {
struct connection*c = i->userdata;
assert(i && c && length);
- pa_memblockq_drop(c->input_memblockq, length);
+ pa_memblockq_drop(c->input_memblockq, chunk, length);
/* do something */
assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
diff --git a/polyp/pstream.c b/polyp/pstream.c
index 5664e18a..ad3dd0e0 100644
--- a/polyp/pstream.c
+++ b/polyp/pstream.c
@@ -50,7 +50,7 @@ struct item_info {
/* memblock info */
struct pa_memchunk chunk;
uint32_t channel;
- int32_t delta;
+ uint32_t delta;
/* packet info */
struct pa_packet *packet;
@@ -86,7 +86,7 @@ struct pa_pstream {
void (*recieve_packet_callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata);
void *recieve_packet_callback_userdata;
- void (*recieve_memblock_callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata);
+ void (*recieve_memblock_callback) (struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata);
void *recieve_memblock_callback_userdata;
void (*drain_callback)(struct pa_pstream *p, void *userdata);
@@ -219,7 +219,7 @@ void pa_pstream_send_packet(struct pa_pstream*p, struct pa_packet *packet) {
p->mainloop->defer_enable(p->defer_event, 1);
}
-void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk) {
+void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk) {
struct item_info *i;
assert(p && channel != (uint32_t) -1 && chunk);
@@ -242,7 +242,7 @@ void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, void (*callbac
p->recieve_packet_callback_userdata = userdata;
}
-void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata), void *userdata) {
+void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata), void *userdata) {
assert(p && callback);
p->recieve_memblock_callback = callback;
@@ -378,7 +378,7 @@ static void do_read(struct pa_pstream *p) {
p->recieve_memblock_callback(
p,
ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
- (int32_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA]),
+ ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA]),
&chunk,
p->recieve_memblock_callback_userdata);
}
diff --git a/polyp/pstream.h b/polyp/pstream.h
index 9a289507..dfd29983 100644
--- a/polyp/pstream.h
+++ b/polyp/pstream.h
@@ -37,10 +37,10 @@ void pa_pstream_unref(struct pa_pstream*p);
struct pa_pstream* pa_pstream_ref(struct pa_pstream*p);
void pa_pstream_send_packet(struct pa_pstream*p, struct pa_packet *packet);
-void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk);
+void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk);
void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata), void *userdata);
-void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata), void *userdata);
+void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata), void *userdata);
void pa_pstream_set_drain_callback(struct pa_pstream *p, void (*cb)(struct pa_pstream *p, void *userdata), void *userdata);
void pa_pstream_set_die_callback(struct pa_pstream *p, void (*callback)(struct pa_pstream *p, void *userdata), void *userdata);
diff --git a/polyp/sink-input.c b/polyp/sink-input.c
index c57dd8e0..5009033f 100644
--- a/polyp/sink-input.c
+++ b/polyp/sink-input.c
@@ -59,6 +59,7 @@ struct pa_sink_input* pa_sink_input_new(struct pa_sink *s, const char *name, con
i->get_latency = NULL;
i->userdata = NULL;
+ i->corked = 0;
i->volume = PA_VOLUME_NORM;
i->resampled_chunk.memblock = NULL;
@@ -120,6 +121,9 @@ uint32_t pa_sink_input_get_latency(struct pa_sink_input *i) {
int pa_sink_input_peek(struct pa_sink_input *i, struct pa_memchunk *chunk) {
assert(i && chunk && i->peek && i->drop);
+ if (i->corked == 0)
+ return -1;
+
if (!i->resampler)
return i->peek(i, chunk);
@@ -134,11 +138,12 @@ int pa_sink_input_peek(struct pa_sink_input *i, struct pa_memchunk *chunk) {
assert(tchunk.length);
l = pa_resampler_request(i->resampler, CONVERT_BUFFER_LENGTH);
+
+ i->drop(i, &tchunk, l);
+
if (tchunk.length > l)
tchunk.length = l;
- i->drop(i, tchunk.length);
-
pa_resampler_run(i->resampler, &tchunk, &i->resampled_chunk);
pa_memblock_unref(tchunk.memblock);
}
@@ -149,11 +154,11 @@ int pa_sink_input_peek(struct pa_sink_input *i, struct pa_memchunk *chunk) {
return 0;
}
-void pa_sink_input_drop(struct pa_sink_input *i, size_t length) {
+void pa_sink_input_drop(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) {
assert(i && length);
if (!i->resampler) {
- i->drop(i, length);
+ i->drop(i, chunk, length);
return;
}
@@ -177,3 +182,13 @@ void pa_sink_input_set_volume(struct pa_sink_input *i, pa_volume_t volume) {
pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index);
}
}
+
+void pa_sink_input_cork(struct pa_sink_input *i, int b) {
+ int n;
+ assert(i);
+ n = i->corked && !b;
+ i->corked = b;
+
+ if (n)
+ pa_sink_notify(i->sink);
+}
diff --git a/polyp/sink-input.h b/polyp/sink-input.h
index 8d7788d8..b0644540 100644
--- a/polyp/sink-input.h
+++ b/polyp/sink-input.h
@@ -34,6 +34,8 @@
struct pa_sink_input {
uint32_t index;
+ int corked;
+
char *name;
struct pa_module *owner;
struct pa_client *client;
@@ -42,7 +44,7 @@ struct pa_sink_input {
uint32_t volume;
int (*peek) (struct pa_sink_input *i, struct pa_memchunk *chunk);
- void (*drop) (struct pa_sink_input *i, size_t length);
+ void (*drop) (struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length);
void (*kill) (struct pa_sink_input *i);
uint32_t (*get_latency) (struct pa_sink_input *i);
@@ -62,8 +64,10 @@ void pa_sink_input_kill(struct pa_sink_input *i);
uint32_t pa_sink_input_get_latency(struct pa_sink_input *i);
int pa_sink_input_peek(struct pa_sink_input *i, struct pa_memchunk *chunk);
-void pa_sink_input_drop(struct pa_sink_input *i, size_t length);
+void pa_sink_input_drop(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length);
void pa_sink_input_set_volume(struct pa_sink_input *i, pa_volume_t volume);
+void pa_sink_input_cork(struct pa_sink_input *i, int b);
+
#endif
diff --git a/polyp/sink.c b/polyp/sink.c
index 62b9a7af..43fd351c 100644
--- a/polyp/sink.c
+++ b/polyp/sink.c
@@ -147,8 +147,8 @@ static void inputs_drop(struct pa_sink *s, struct pa_mix_info *info, unsigned ma
struct pa_sink_input *i = info->userdata;
assert(i && info->chunk.memblock);
+ pa_sink_input_drop(i, &info->chunk, length);
pa_memblock_unref(info->chunk.memblock);
- pa_sink_input_drop(i, length);
}
}
diff --git a/polyp/util.c b/polyp/util.c
index 2878c546..6c8febb6 100644
--- a/polyp/util.c
+++ b/polyp/util.c
@@ -36,6 +36,7 @@
#include <pwd.h>
#include <signal.h>
#include <pthread.h>
+#include <sys/time.h>
#include "util.h"
#include "xmalloc.h"
@@ -192,3 +193,23 @@ char *pa_get_host_name(char *s, size_t l) {
s[l-1] = 0;
return s;
}
+
+uint32_t pa_age(struct timeval *tv) {
+ struct timeval now;
+ uint32_t r;
+ assert(tv);
+
+ if (tv->tv_sec == 0)
+ return 0;
+
+ gettimeofday(&now, NULL);
+
+ r = (now.tv_sec-tv->tv_sec) * 1000000;
+
+ if (now.tv_usec >= tv->tv_usec)
+ r += now.tv_usec - tv->tv_usec;
+ else
+ r -= tv->tv_usec - now.tv_usec;
+
+ return r;
+}
diff --git a/polyp/util.h b/polyp/util.h
index 7dd7b7de..9dab45d2 100644
--- a/polyp/util.h
+++ b/polyp/util.h
@@ -23,6 +23,7 @@
***/
#include <sys/types.h>
+#include <inttypes.h>
void pa_make_nonblock_fd(int fd);
@@ -38,4 +39,6 @@ char *pa_sprintf_malloc(const char *format, ...) __attribute__ ((format (printf,
char *pa_get_user_name(char *s, size_t l);
char *pa_get_host_name(char *s, size_t l);
+uint32_t pa_age(struct timeval *tv);
+
#endif