summaryrefslogtreecommitdiffstats
path: root/src/polypcore
diff options
context:
space:
mode:
authorLennart Poettering <lennart@poettering.net>2006-02-20 04:05:16 +0000
committerLennart Poettering <lennart@poettering.net>2006-02-20 04:05:16 +0000
commit304449002cbc84fdcf235b5dfaec891278dd7085 (patch)
tree2a2d00e34d5c620835b76a0d6f7890a1d3e9fb97 /src/polypcore
parent0876b1ba82ea9c988df90ca98d202765ac697313 (diff)
1) Add flexible seeking support (including absolute) for memory block queues and playback streams
2) Add support to synchronize multiple playback streams 3) add two tests for 1) and 2) 4) s/PA_ERROR/PA_ERR/ 5) s/PA_ERROR_OK/PA_OK/ 6) update simple API to deal properly with new peek/drop recording API 7) add beginnings of proper validity checking on API calls in client libs (needs to be extended) 8) report playback buffer overflows/underflows to the client 9) move client side recording mcalign stuff into the memblockq 10) create typedefs for a bunch of API callback prototypes 11) simplify handling of HUP poll() events Yes, i know, it's usually better to commit a lot of small patches instead of a single big one. In this case however, this would have contradicted the other rule: never commit broken or incomplete stuff. *** This stuff needs a lot of additional testing! *** git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@511 fefdeb5f-60dc-0310-8127-8f9354f1896f
Diffstat (limited to 'src/polypcore')
-rw-r--r--src/polypcore/iochannel.c45
-rw-r--r--src/polypcore/llist.h10
-rw-r--r--src/polypcore/mcalign.c27
-rw-r--r--src/polypcore/mcalign.h3
-rw-r--r--src/polypcore/memblock.c7
-rw-r--r--src/polypcore/memblock.h1
-rw-r--r--src/polypcore/memblockq.c622
-rw-r--r--src/polypcore/memblockq.h102
-rw-r--r--src/polypcore/native-common.h16
-rw-r--r--src/polypcore/packet.c25
-rw-r--r--src/polypcore/packet.h2
-rw-r--r--src/polypcore/protocol-esound.c62
-rw-r--r--src/polypcore/protocol-native.c388
-rw-r--r--src/polypcore/protocol-simple.c66
-rw-r--r--src/polypcore/pstream.c140
-rw-r--r--src/polypcore/pstream.h5
-rw-r--r--src/polypcore/sample-util.c9
-rw-r--r--src/polypcore/sample-util.h1
-rw-r--r--src/polypcore/sink.c2
-rw-r--r--src/polypcore/sink.h2
20 files changed, 1076 insertions, 459 deletions
diff --git a/src/polypcore/iochannel.c b/src/polypcore/iochannel.c
index 7fd09152..c33f593e 100644
--- a/src/polypcore/iochannel.c
+++ b/src/polypcore/iochannel.c
@@ -59,17 +59,17 @@ static void enable_mainloop_sources(pa_iochannel *io) {
pa_io_event_flags_t f = PA_IO_EVENT_NULL;
assert(io->input_event);
- if (!io->readable)
+ if (!pa_iochannel_is_readable(io))
f |= PA_IO_EVENT_INPUT;
- if (!io->writable)
+ if (!pa_iochannel_is_writable(io))
f |= PA_IO_EVENT_OUTPUT;
io->mainloop->io_enable(io->input_event, f);
} else {
if (io->input_event)
- io->mainloop->io_enable(io->input_event, io->readable ? PA_IO_EVENT_NULL : PA_IO_EVENT_INPUT);
+ io->mainloop->io_enable(io->input_event, pa_iochannel_is_readable(io) ? PA_IO_EVENT_NULL : PA_IO_EVENT_INPUT);
if (io->output_event)
- io->mainloop->io_enable(io->output_event, io->writable ? PA_IO_EVENT_NULL : PA_IO_EVENT_OUTPUT);
+ io->mainloop->io_enable(io->output_event, pa_iochannel_is_writable(io) ? PA_IO_EVENT_NULL : PA_IO_EVENT_OUTPUT);
}
}
@@ -82,33 +82,21 @@ static void callback(pa_mainloop_api* m, pa_io_event *e, int fd, pa_io_event_fla
assert(fd >= 0);
assert(userdata);
- if ((f & (PA_IO_EVENT_HANGUP|PA_IO_EVENT_ERROR)) && !io->hungup) {
+ if ((f & (PA_IO_EVENT_HANGUP|PA_IO_EVENT_ERROR)) & !io->hungup) {
io->hungup = 1;
changed = 1;
+ }
- if (e == io->input_event) {
- io->mainloop->io_free(io->input_event);
- io->input_event = NULL;
-
- if (io->output_event == e)
- io->output_event = NULL;
- } else if (e == io->output_event) {
- io->mainloop->io_free(io->output_event);
- io->output_event = NULL;
- }
- } else {
-
- if ((f & PA_IO_EVENT_INPUT) && !io->readable) {
- io->readable = 1;
- changed = 1;
- assert(e == io->input_event);
- }
-
- if ((f & PA_IO_EVENT_OUTPUT) && !io->writable) {
- io->writable = 1;
- changed = 1;
- assert(e == io->output_event);
- }
+ if ((f & PA_IO_EVENT_INPUT) && !io->readable) {
+ io->readable = 1;
+ changed = 1;
+ assert(e == io->input_event);
+ }
+
+ if ((f & PA_IO_EVENT_OUTPUT) && !io->writable) {
+ io->writable = 1;
+ changed = 1;
+ assert(e == io->output_event);
}
if (changed) {
@@ -217,6 +205,7 @@ ssize_t pa_iochannel_write(pa_iochannel*io, const void*data, size_t l) {
if (r < 0)
#endif
r = write(io->ofd, data, l);
+
if (r >= 0) {
io->writable = 0;
enable_mainloop_sources(io);
diff --git a/src/polypcore/llist.h b/src/polypcore/llist.h
index eb8cd017..c54742d3 100644
--- a/src/polypcore/llist.h
+++ b/src/polypcore/llist.h
@@ -66,4 +66,14 @@
_item->next = _item->prev = NULL; \
} while(0)
+#define PA_LLIST_FIND_HEAD(t,item,head) \
+do { \
+ t **_head = (head), *_item = (item); \
+ *_head = _item; \
+ assert(_head); \
+ while ((*_head)->prev) \
+ *_head = (*_head)->prev; \
+} while (0) \
+
+
#endif
diff --git a/src/polypcore/mcalign.c b/src/polypcore/mcalign.c
index 0f229e28..f90fd7e8 100644
--- a/src/polypcore/mcalign.c
+++ b/src/polypcore/mcalign.c
@@ -43,6 +43,7 @@ pa_mcalign *pa_mcalign_new(size_t base, pa_memblock_stat *s) {
assert(base);
m = pa_xnew(pa_mcalign, 1);
+
m->base = base;
pa_memchunk_reset(&m->leftover);
pa_memchunk_reset(&m->current);
@@ -64,11 +65,16 @@ void pa_mcalign_free(pa_mcalign *m) {
}
void pa_mcalign_push(pa_mcalign *m, const pa_memchunk *c) {
- assert(m && c && c->memblock && c->length);
+ assert(m);
+ assert(c);
+
+ assert(c->memblock);
+ assert(c->length > 0);
+
+ assert(!m->current.memblock);
/* Append to the leftover memory block */
if (m->leftover.memblock) {
- assert(!m->current.memblock);
/* Try to merge */
if (m->leftover.memblock == c->memblock &&
@@ -110,8 +116,6 @@ void pa_mcalign_push(pa_mcalign *m, const pa_memchunk *c) {
}
}
} else {
- assert(!m->leftover.memblock && !m->current.memblock);
-
/* Nothing to merge or copy, just store it */
if (c->length >= m->base)
@@ -124,7 +128,8 @@ void pa_mcalign_push(pa_mcalign *m, const pa_memchunk *c) {
}
int pa_mcalign_pop(pa_mcalign *m, pa_memchunk *c) {
- assert(m && c);
+ assert(m);
+ assert(c);
/* First test if there's a leftover memory block available */
if (m->leftover.memblock) {
@@ -187,3 +192,15 @@ int pa_mcalign_pop(pa_mcalign *m, pa_memchunk *c) {
return -1;
}
+
+size_t pa_mcalign_csize(pa_mcalign *m, size_t l) {
+ assert(m);
+ assert(l > 0);
+
+ assert(!m->current.memblock);
+
+ if (m->leftover.memblock)
+ l += m->leftover.length;
+
+ return (l/m->base)*m->base;
+}
diff --git a/src/polypcore/mcalign.h b/src/polypcore/mcalign.h
index a9107e0e..58019462 100644
--- a/src/polypcore/mcalign.h
+++ b/src/polypcore/mcalign.h
@@ -74,4 +74,7 @@ void pa_mcalign_push(pa_mcalign *m, const pa_memchunk *c);
* nonzero otherwise. */
int pa_mcalign_pop(pa_mcalign *m, pa_memchunk *c);
+/* If we pass l bytes in now, how many bytes would we get out? */
+size_t pa_mcalign_csize(pa_mcalign *m, size_t l);
+
#endif
diff --git a/src/polypcore/memblock.c b/src/polypcore/memblock.c
index 2c0bef57..04e8436f 100644
--- a/src/polypcore/memblock.c
+++ b/src/polypcore/memblock.c
@@ -111,13 +111,16 @@ pa_memblock *pa_memblock_new_user(void *d, size_t length, void (*free_cb)(void *
}
pa_memblock* pa_memblock_ref(pa_memblock*b) {
- assert(b && b->ref >= 1);
+ assert(b);
+ assert(b->ref >= 1);
+
b->ref++;
return b;
}
void pa_memblock_unref(pa_memblock*b) {
- assert(b && b->ref >= 1);
+ assert(b);
+ assert(b->ref >= 1);
if ((--(b->ref)) == 0) {
stat_remove(b);
diff --git a/src/polypcore/memblock.h b/src/polypcore/memblock.h
index c5751406..9471278a 100644
--- a/src/polypcore/memblock.h
+++ b/src/polypcore/memblock.h
@@ -79,7 +79,6 @@ references to the memory. This causes the memory to be copied and
converted into a PA_MEMBLOCK_DYNAMIC type memory block */
void pa_memblock_unref_fixed(pa_memblock*b);
-
pa_memblock_stat* pa_memblock_stat_new(void);
void pa_memblock_stat_unref(pa_memblock_stat *s);
pa_memblock_stat * pa_memblock_stat_ref(pa_memblock_stat *s);
diff --git a/src/polypcore/memblockq.c b/src/polypcore/memblockq.c
index 4a0225e5..05c810bd 100644
--- a/src/polypcore/memblockq.c
+++ b/src/polypcore/memblockq.c
@@ -38,30 +38,45 @@
struct memblock_list {
struct memblock_list *next, *prev;
+ int64_t index;
pa_memchunk chunk;
};
struct pa_memblockq {
struct memblock_list *blocks, *blocks_tail;
unsigned n_blocks;
- size_t current_length, maxlength, tlength, base, prebuf, orig_prebuf, minreq;
- pa_mcalign *mcalign;
+ size_t maxlength, tlength, base, prebuf, minreq;
+ int64_t read_index, write_index;
+ enum { PREBUF, RUNNING } state;
pa_memblock_stat *memblock_stat;
+ pa_memblock *silence;
+ pa_mcalign *mcalign;
};
-pa_memblockq* pa_memblockq_new(size_t maxlength, size_t tlength, size_t base, size_t prebuf, size_t minreq, pa_memblock_stat *s) {
+pa_memblockq* pa_memblockq_new(
+ int64_t idx,
+ size_t maxlength,
+ size_t tlength,
+ size_t base,
+ size_t prebuf,
+ size_t minreq,
+ pa_memblock *silence,
+ pa_memblock_stat *s) {
+
pa_memblockq* bq;
- assert(maxlength && base && maxlength);
- bq = pa_xmalloc(sizeof(pa_memblockq));
- bq->blocks = bq->blocks_tail = 0;
+ assert(base > 0);
+ assert(maxlength >= base);
+
+ bq = pa_xnew(pa_memblockq, 1);
+ bq->blocks = bq->blocks_tail = NULL;
bq->n_blocks = 0;
- bq->current_length = 0;
+ bq->base = base;
+ bq->read_index = bq->write_index = idx;
+ bq->memblock_stat = s;
pa_log_debug(__FILE__": memblockq requested: maxlength=%u, tlength=%u, base=%u, prebuf=%u, minreq=%u\n", maxlength, tlength, base, prebuf, minreq);
-
- bq->base = base;
bq->maxlength = ((maxlength+base-1)/base)*base;
assert(bq->maxlength >= base);
@@ -70,26 +85,25 @@ pa_memblockq* pa_memblockq_new(size_t maxlength, size_t tlength, size_t base, si
if (!bq->tlength || bq->tlength >= bq->maxlength)
bq->tlength = bq->maxlength;
- bq->minreq = (minreq/base)*base;
- if (bq->minreq == 0)
- bq->minreq = 1;
-
- bq->prebuf = (prebuf == (size_t) -1) ? bq->maxlength/2 : prebuf;
- bq->prebuf = (bq->prebuf/base)*base;
+ bq->prebuf = (prebuf == (size_t) -1) ? bq->tlength/2 : prebuf;
+ bq->prebuf = ((bq->prebuf+base-1)/base)*base;
if (bq->prebuf > bq->maxlength)
bq->prebuf = bq->maxlength;
- if (bq->prebuf > bq->tlength - bq->minreq)
- bq->prebuf = bq->tlength - bq->minreq;
+ bq->minreq = (minreq/base)*base;
+
+ if (bq->minreq > bq->tlength - bq->prebuf)
+ bq->minreq = bq->tlength - bq->prebuf;
- bq->orig_prebuf = bq->prebuf;
+ if (!bq->minreq)
+ bq->minreq = 1;
pa_log_debug(__FILE__": memblockq sanitized: maxlength=%u, tlength=%u, base=%u, prebuf=%u, minreq=%u\n", bq->maxlength, bq->tlength, bq->base, bq->prebuf, bq->minreq);
-
- bq->mcalign = NULL;
-
- bq->memblock_stat = s;
+ bq->state = bq->prebuf ? PREBUF : RUNNING;
+ bq->silence = silence ? pa_memblock_ref(silence) : NULL;
+ bq->mcalign = NULL;
+
return bq;
}
@@ -97,248 +111,510 @@ void pa_memblockq_free(pa_memblockq* bq) {
assert(bq);
pa_memblockq_flush(bq);
-
+
+ if (bq->silence)
+ pa_memblock_unref(bq->silence);
+
if (bq->mcalign)
pa_mcalign_free(bq->mcalign);
-
+
pa_xfree(bq);
}
-void pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *chunk, size_t delta) {
- struct memblock_list *q;
- assert(bq && chunk && chunk->memblock && chunk->length && (chunk->length % bq->base) == 0);
-
- pa_memblockq_seek(bq, delta);
-
- if (bq->blocks_tail && bq->blocks_tail->chunk.memblock == chunk->memblock) {
- /* Try to merge memory chunks */
+static void drop_block(pa_memblockq *bq, struct memblock_list *q) {
+ assert(bq);
+ assert(q);
- if (bq->blocks_tail->chunk.index+bq->blocks_tail->chunk.length == chunk->index) {
- bq->blocks_tail->chunk.length += chunk->length;
- bq->current_length += chunk->length;
- return;
- }
- }
+ assert(bq->n_blocks >= 1);
- q = pa_xmalloc(sizeof(struct memblock_list));
-
- q->chunk = *chunk;
- pa_memblock_ref(q->chunk.memblock);
- assert(q->chunk.index+q->chunk.length <= q->chunk.memblock->length);
- q->next = NULL;
- if ((q->prev = bq->blocks_tail))
- bq->blocks_tail->next = q;
+ if (q->prev)
+ q->prev->next = q->next;
else
- bq->blocks = q;
+ bq->blocks = q->next;
- bq->blocks_tail = q;
+ if (q->next)
+ q->next->prev = q->prev;
+ else
+ bq->blocks_tail = q->prev;
- bq->n_blocks++;
- bq->current_length += chunk->length;
+ pa_memblock_unref(q->chunk.memblock);
+ pa_xfree(q);
- pa_memblockq_shorten(bq, bq->maxlength);
+ bq->n_blocks--;
}
-int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) {
- assert(bq && chunk);
+static int can_push(pa_memblockq *bq, size_t l) {
+ int64_t end;
- if (!bq->blocks || bq->current_length < bq->prebuf)
- return -1;
+ assert(bq);
- bq->prebuf = 0;
+ if (bq->read_index > bq->write_index) {
+ int64_t d = bq->read_index - bq->write_index;
- *chunk = bq->blocks->chunk;
- pa_memblock_ref(chunk->memblock);
+ if (l > d)
+ l -= d;
+ else
+ return 1;
+ }
- return 0;
+ end = bq->blocks_tail ? bq->blocks_tail->index + bq->blocks_tail->chunk.length : 0;
+
+ /* Make sure that the list doesn't get too long */
+ if (bq->write_index + l > end)
+ if (bq->write_index + l - bq->read_index > bq->maxlength)
+ return 0;
+
+ return 1;
}
-void pa_memblockq_drop(pa_memblockq *bq, const pa_memchunk *chunk, size_t length) {
- assert(bq && chunk && length);
+int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) {
+
+ struct memblock_list *q, *n;
+ pa_memchunk chunk;
+
+ assert(bq);
+ assert(uchunk);
+ assert(uchunk->memblock);
+ assert(uchunk->length > 0);
+ assert(uchunk->index + uchunk->length <= uchunk->memblock->length);
+
+ if (uchunk->length % bq->base)
+ return -1;
+
+ if (!can_push(bq, uchunk->length))
+ return -1;
- if (!bq->blocks || memcmp(&bq->blocks->chunk, chunk, sizeof(pa_memchunk)))
- return;
+ chunk = *uchunk;
- assert(length <= bq->blocks->chunk.length);
- pa_memblockq_skip(bq, length);
-}
+ if (bq->read_index > bq->write_index) {
-static void remove_block(pa_memblockq *bq, struct memblock_list *q) {
- assert(bq && q);
+ /* We currently have a buffer underflow, we need to drop some
+ * incoming data */
- if (q->prev)
- q->prev->next = q->next;
- else {
- assert(bq->blocks == q);
- bq->blocks = q->next;
+ int64_t d = bq->read_index - bq->write_index;
+
+ if (chunk.length > d) {
+ chunk.index += d;
+ chunk.length -= d;
+ bq->write_index = bq->read_index;
+ } else {
+ /* We drop the incoming data completely */
+ bq->write_index += chunk.length;
+ return 0;
+ }
}
- if (q->next)
- q->next->prev = q->prev;
- else {
- assert(bq->blocks_tail == q);
- bq->blocks_tail = q->prev;
+ /* We go from back to front to look for the right place to add
+ * this new entry. Drop data we will overwrite on the way */
+
+ q = bq->blocks_tail;
+ while (q) {
+
+ if (bq->write_index >= q->index + q->chunk.length)
+ /* We found the entry where we need to place the new entry immediately after */
+ break;
+ else if (bq->write_index + chunk.length <= q->index) {
+ /* This entry isn't touched at all, let's skip it */
+ q = q->prev;
+ } else if (bq->write_index <= q->index &&
+ bq->write_index + chunk.length >= q->index + q->chunk.length) {
+
+ /* This entry is fully replaced by the new entry, so let's drop it */
+
+ struct memblock_list *p;
+ p = q;
+ q = q->prev;
+ drop_block(bq, p);
+ } else if (bq->write_index >= q->index) {
+ /* The write index points into this memblock, so let's
+ * truncate or split it */
+
+ if (bq->write_index + chunk.length < q->index + q->chunk.length) {
+
+ /* We need to save the end of this memchunk */
+ struct memblock_list *p;
+ size_t d;
+
+ /* Create a new list entry for the end of thie memchunk */
+ p = pa_xnew(struct memblock_list, 1);
+ p->chunk = q->chunk;
+ pa_memblock_ref(p->chunk.memblock);
+
+ /* Calculate offset */
+ d = bq->write_index + chunk.length - q->index;
+ assert(d > 0);
+
+ /* Drop it from the new entry */
+ p->index = q->index + d;
+ p->chunk.length -= d;
+
+ /* Add it to the list */
+ p->prev = q;
+ if ((p->next = q->next))
+ q->next->prev = p;
+ else
+ bq->blocks_tail = p;
+ q->next = p;
+
+ bq->n_blocks++;
+ }
+
+ /* Truncate the chunk */
+ if (!(q->chunk.length = bq->write_index - q->index)) {
+ struct memblock_list *p;
+ p = q;
+ q = q->prev;
+ drop_block(bq, p);
+ }
+
+ /* We had to truncate this block, hence we're now at the right position */
+ break;
+ } else {
+ size_t d;
+
+ assert(bq->write_index + chunk.length > q->index &&
+ bq->write_index + chunk.length < q->index + q->chunk.length &&
+ bq->write_index < q->index);
+
+ /* The job overwrites the current entry at the end, so let's drop the beginning of this entry */
+
+ d = bq->write_index + chunk.length - q->index;
+ q->index += d;
+ q->chunk.index += d;
+ q->chunk.length -= d;
+
+ q = q->prev;
+ }
+
}
+
+ if (q) {
+ assert(bq->write_index >= q->index + q->chunk.length);
+ assert(!q->next || (bq->write_index+chunk.length <= q->next->index));
+
+ /* Try to merge memory blocks */
+
+ if (q->chunk.memblock == chunk.memblock &&
+ q->chunk.index + q->chunk.length == chunk.index &&
+ bq->write_index == q->index + q->chunk.length) {
+
+ q->chunk.length += chunk.length;
+ bq->write_index += chunk.length;
+ return 0;
+ }
+ } else
+ assert(!bq->blocks || (bq->write_index+chunk.length <= bq->blocks->index));
+
+
+ n = pa_xnew(struct memblock_list, 1);
+ n->chunk = chunk;
+ pa_memblock_ref(n->chunk.memblock);
+ n->index = bq->write_index;
+ bq->write_index += n->chunk.length;
+
+ n->next = q ? q->next : bq->blocks;
+ n->prev = q;
+
+ if (n->next)
+ n->next->prev = n;
+ else
+ bq->blocks_tail = n;
+
+ if (n->prev)
+ n->prev->next = n;
+ else
+ bq->blocks = n;
- pa_memblock_unref(q->chunk.memblock);
- pa_xfree(q);
-
- bq->n_blocks--;
+ bq->n_blocks++;
+ return 0;
}
-void pa_memblockq_skip(pa_memblockq *bq, size_t length) {
- assert(bq && length && (length % bq->base) == 0);
+int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) {
+ assert(bq);
+ assert(chunk);
- while (length > 0) {
- size_t l = length;
- assert(bq->blocks && bq->current_length >= length);
-
- if (l > bq->blocks->chunk.length)
- l = bq->blocks->chunk.length;
+ if (bq->state == PREBUF) {
- bq->blocks->chunk.index += l;
- bq->blocks->chunk.length -= l;
- bq->current_length -= l;
-
- if (!bq->blocks->chunk.length)
- remove_block(bq, bq->blocks);
+ /* We need to pre-buffer */
+ if (pa_memblockq_get_length(bq) < bq->prebuf)
+ return -1;
+
+ bq->state = RUNNING;
- length -= l;
+ } else if (bq->prebuf > 0 && bq->read_index >= bq->write_index) {
+
+ /* Buffer underflow protection */
+ bq->state = PREBUF;
+ return -1;
}
-}
+
+ /* Do we need to spit out silence? */
+ if (!bq->blocks || bq->blocks->index > bq->read_index) {
-void pa_memblockq_shorten(pa_memblockq *bq, size_t length) {
- size_t l;
- assert(bq);
+ size_t length;
+
+ /* How much silence shall we return? */
+ length = bq->blocks ? bq->blocks->index - bq->read_index : 0;
+
+ /* We need to return silence, since no data is yet available */
+ if (bq->silence) {
+ chunk->memblock = pa_memblock_ref(bq->silence);
- if (bq->current_length <= length)
- return;
+ if (!length || length > chunk->memblock->length)
+ length = chunk->memblock->length;
+
+ chunk->length = length;
+ } else {
+ chunk->memblock = NULL;
+ chunk->length = length;
+ }
+
+ chunk->index = 0;
+ return 0;
+ }
- /*pa_log(__FILE__": Warning! pa_memblockq_shorten()\n");*/
+ /* Ok, let's pass real data to the caller */
+ assert(bq->blocks->index == bq->read_index);
- l = bq->current_length - length;
- l /= bq->base;
- l *= bq->base;
+ *chunk = bq->blocks->chunk;
+ pa_memblock_ref(chunk->memblock);
- pa_memblockq_skip(bq, l);
+ return 0;
}
-
-void pa_memblockq_empty(pa_memblockq *bq) {
+void pa_memblockq_drop(pa_memblockq *bq, const pa_memchunk *chunk, size_t length) {
assert(bq);
- pa_memblockq_shorten(bq, 0);
+ assert(length % bq->base == 0);
+
+ assert(!chunk || length <= chunk->length);
+
+ if (chunk) {
+
+ if (bq->blocks && bq->blocks->index == bq->read_index) {
+ /* The first item in queue is valid */
+
+ /* Does the chunk match with what the user supplied us? */
+ if (memcmp(chunk, &bq->blocks->chunk, sizeof(pa_memchunk)) != 0)
+ return;
+
+ } else {
+ size_t l;
+
+ /* The first item in the queue is not yet relevant */
+
+ assert(!bq->blocks || bq->blocks->index > bq->read_index);
+ l = bq->blocks ? bq->blocks->index - bq->read_index : 0;
+
+ if (bq->silence) {
+
+ if (!l || l > bq->silence->length)
+ l = bq->silence->length;
+
+ }
+
+ /* Do the entries still match? */
+ if (chunk->index != 0 || chunk->length != l || chunk->memblock != bq->silence)
+ return;
+ }
+ }
+
+ while (length > 0) {
+
+ if (bq->blocks) {
+ size_t d;
+
+ assert(bq->blocks->index >= bq->read_index);
+
+ d = (size_t) (bq->blocks->index - bq->read_index);
+
+ if (d >= length) {
+ /* The first block is too far in the future */
+
+ bq->read_index += length;
+ break;
+ } else {
+
+ length -= d;
+ bq->read_index += d;
+ }
+
+ assert(bq->blocks->index == bq->read_index);
+
+ if (bq->blocks->chunk.length <= length) {
+ /* We need to drop the full block */
+
+ length -= bq->blocks->chunk.length;
+ bq->read_index += bq->blocks->chunk.length;
+
+ drop_block(bq, bq->blocks);
+ } else {
+ /* Only the start of this block needs to be dropped */
+
+ bq->blocks->chunk.index += length;
+ bq->blocks->chunk.length -= length;
+ bq->blocks->index += length;
+ bq->read_index += length;
+ break;
+ }
+
+ } else {
+
+ /* The list is empty, there's nothing we could drop */
+ bq->read_index += length;
+ break;
+ }
+ }
}
int pa_memblockq_is_readable(pa_memblockq *bq) {
assert(bq);
- return bq->current_length && (bq->current_length >= bq->prebuf);
+ if (bq->prebuf > 0) {
+ size_t l = pa_memblockq_get_length(bq);
+
+ if (bq->state == PREBUF && l < bq->prebuf)
+ return 0;
+
+ if (l <= 0)
+ return 0;
+ }
+
+ return 1;
}
int pa_memblockq_is_writable(pa_memblockq *bq, size_t length) {
assert(bq);
- return bq->current_length + length <= bq->tlength;
+ if (length % bq->base)
+ return 0;
+
+ return pa_memblockq_get_length(bq) + length <= bq->tlength;
}
-uint32_t pa_memblockq_get_length(pa_memblockq *bq) {
+size_t pa_memblockq_get_length(pa_memblockq *bq) {
assert(bq);
- return bq->current_length;
+
+ if (bq->write_index <= bq->read_index)
+ return 0;
+
+ return (size_t) (bq->write_index - bq->read_index);
}
-uint32_t pa_memblockq_missing(pa_memblockq *bq) {
+size_t pa_memblockq_missing(pa_memblockq *bq) {
size_t l;
assert(bq);
- if (bq->current_length >= bq->tlength)
+ if ((l = pa_memblockq_get_length(bq)) >= bq->tlength)
return 0;
- l = bq->tlength - bq->current_length;
- assert(l);
-
+ l = bq->tlength - l;
return (l >= bq->minreq) ? l : 0;
}
-void pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk, size_t delta) {
- pa_memchunk rchunk;
- assert(bq && chunk && bq->base);
+size_t pa_memblockq_get_minreq(pa_memblockq *bq) {
+ assert(bq);
- if (bq->base == 1) {
- pa_memblockq_push(bq, chunk, delta);
- return;
- }
+ return bq->minreq;
+}
- if (!bq->mcalign) {
- bq->mcalign = pa_mcalign_new(bq->base, bq->memblock_stat);
- assert(bq->mcalign);
+void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek) {
+ assert(bq);
+
+ switch (seek) {
+ case PA_SEEK_RELATIVE:
+ bq->write_index += offset;
+ return;
+ case PA_SEEK_ABSOLUTE:
+ bq->write_index = offset;
+ return;
+ case PA_SEEK_RELATIVE_ON_READ:
+ bq->write_index = bq->read_index + offset;
+ return;
+ case PA_SEEK_RELATIVE_END:
+ bq->write_index = (bq->blocks_tail ? bq->blocks_tail->index + bq->blocks_tail->chunk.length : bq->read_index) + offset;
+ return;
}
+
+ assert(0);
+}
+
+void pa_memblockq_flush(pa_memblockq *bq) {
+ assert(bq);
- pa_mcalign_push(bq->mcalign, chunk);
+ while (bq->blocks)
+ drop_block(bq, bq->blocks);
- while (pa_mcalign_pop(bq->mcalign, &rchunk) >= 0) {
- pa_memblockq_push(bq, &rchunk, delta);
- pa_memblock_unref(rchunk.memblock);
- delta = 0;
- }
+ assert(bq->n_blocks == 0);
+ bq->write_index = bq->read_index;
+
+ pa_memblockq_prebuf_force(bq);
}
-uint32_t pa_memblockq_get_minreq(pa_memblockq *bq) {
+size_t pa_memblockq_get_tlength(pa_memblockq *bq) {
assert(bq);
- return bq->minreq;
+
+ return bq->tlength;
}
-void pa_memblockq_prebuf_disable(pa_memblockq *bq) {
+int64_t pa_memblockq_get_read_index(pa_memblockq *bq) {
assert(bq);
- bq->prebuf = 0;
+ return bq->read_index;
}
-void pa_memblockq_prebuf_reenable(pa_memblockq *bq) {
+int64_t pa_memblockq_get_write_index(pa_memblockq *bq) {
assert(bq);
- bq->prebuf = bq->orig_prebuf;
+ return bq->write_index;
}
-void pa_memblockq_seek(pa_memblockq *bq, size_t length) {
+int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk) {
+ pa_memchunk rchunk;
+
assert(bq);
+ assert(chunk && bq->base);
+
+ if (bq->base == 1)
+ return pa_memblockq_push(bq, chunk);
+
+ if (!bq->mcalign)
+ bq->mcalign = pa_mcalign_new(bq->base, bq->memblock_stat);
- if (!length)
- return;
+ if (!can_push(bq, pa_mcalign_csize(bq->mcalign, chunk->length)))
+ return -1;
+
+ pa_mcalign_push(bq->mcalign, chunk);
+
+ while (pa_mcalign_pop(bq->mcalign, &rchunk) >= 0) {
+ int r;
+ r = pa_memblockq_push(bq, &rchunk);
+ pa_memblock_unref(rchunk.memblock);
- while (length >= bq->base) {
- size_t l = length;
- if (!bq->current_length)
- return;
+ if (r < 0)
+ return -1;
+ }
- assert(bq->blocks_tail);
-
- if (l > bq->blocks_tail->chunk.length)
- l = bq->blocks_tail->chunk.length;
+ return 0;
+}
- bq->blocks_tail->chunk.length -= l;
- bq->current_length -= l;
-
- if (bq->blocks_tail->chunk.length == 0)
- remove_block(bq, bq->blocks);
+void pa_memblockq_shorten(pa_memblockq *bq, size_t length) {
+ size_t l;
+ assert(bq);
- length -= l;
- }
+ l = pa_memblockq_get_length(bq);
+
+ if (l > length)
+ pa_memblockq_drop(bq, NULL, l - length);
}
-void pa_memblockq_flush(pa_memblockq *bq) {
- struct memblock_list *l;
+void pa_memblockq_prebuf_disable(pa_memblockq *bq) {
assert(bq);
-
- while ((l = bq->blocks)) {
- bq->blocks = l->next;
- pa_memblock_unref(l->chunk.memblock);
- pa_xfree(l);
- }
- bq->blocks_tail = NULL;
- bq->n_blocks = 0;
- bq->current_length = 0;
+ if (bq->state == PREBUF)
+ bq->state = RUNNING;
}
-uint32_t pa_memblockq_get_tlength(pa_memblockq *bq) {
+void pa_memblockq_prebuf_force(pa_memblockq *bq) {
assert(bq);
- return bq->tlength;
+
+ if (bq->state == RUNNING && bq->prebuf > 0)
+ bq->state = PREBUF;
}
diff --git a/src/polypcore/memblockq.h b/src/polypcore/memblockq.h
index 7bb25f90..210f1a07 100644
--- a/src/polypcore/memblockq.h
+++ b/src/polypcore/memblockq.h
@@ -23,9 +23,11 @@
***/
#include <sys/types.h>
+#include <inttypes.h>
#include <polypcore/memblock.h>
#include <polypcore/memchunk.h>
+#include <polyp/def.h>
/* A memblockq is a queue of pa_memchunks (yepp, the name is not
* perfect). It is similar to the ring buffers used by most other
@@ -35,42 +37,59 @@
typedef struct pa_memblockq pa_memblockq;
+
/* Parameters:
- - maxlength: maximum length of queue. If more data is pushed into the queue, data from the front is dropped
- - length: the target length of the queue.
- - base: a base value for all metrics. Only multiples of this value are popped from the queue
- - prebuf: before passing the first byte out, make sure that enough bytes are in the queue
- - minreq: pa_memblockq_missing() will only return values greater than this value
+
+ - idx: start value for both read and write index
+
+ - maxlength: maximum length of queue. If more data is pushed into
+ the queue, the operation will fail. Must not be 0.
+
+ - tlength: the target length of the queue. Pass 0 for the default.
+
+ - base: a base value for all metrics. Only multiples of this value
+ are popped from the queue or should be pushed into
+ it. Must not be 0.
+
+ - prebuf: If the queue runs empty wait until this many bytes are in
+ queue again before passing the first byte out. If set
+ to 0 pa_memblockq_pop() will return a silence memblock
+ if no data is in the queue and will never fail. Pass
+ (size_t) -1 for the default.
+
+ - minreq: pa_memblockq_missing() will only return values greater
+ than this value. Pass 0 for the default.
+
+ - silence: return this memblock whzen reading unitialized data
*/
-pa_memblockq* pa_memblockq_new(size_t maxlength,
- size_t tlength,
- size_t base,
- size_t prebuf,
- size_t minreq,
- pa_memblock_stat *s);
+pa_memblockq* pa_memblockq_new(
+ int64_t idx,
+ size_t maxlength,
+ size_t tlength,
+ size_t base,
+ size_t prebuf,
+ size_t minreq,
+ pa_memblock *silence,
+ pa_memblock_stat *s);
+
void pa_memblockq_free(pa_memblockq*bq);
-/* Push a new memory chunk into the queue. Optionally specify a value for future cancellation. */
-void pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *chunk, size_t delta);
+/* Push a new memory chunk into the queue. */
+int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *chunk);
-/* Same as pa_memblockq_push(), however chunks are filtered through a mcalign object, and thus aligned to multiples of base */
-void pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk, size_t delta);
+/* Push a new memory chunk into the queue, but filter it through a
+ * pa_mcalign object. Don't mix this with pa_memblockq_seek() unless
+ * you know what you do. */
+int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk);
/* Return a copy of the next memory chunk in the queue. It is not removed from the queue */
int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk);
-/* Drop the specified bytes from the queue, only valid aufter pa_memblockq_peek() */
+/* Drop the specified bytes from the queue, but only if the first
+ * chunk in the queue matches the one passed here. If NULL is passed,
+ * this check isn't done. */
void pa_memblockq_drop(pa_memblockq *bq, const pa_memchunk *chunk, size_t length);
-/* Drop the specified bytes from the queue */
-void pa_memblockq_skip(pa_memblockq *bq, size_t length);
-
-/* Shorten the pa_memblockq to the specified length by dropping data at the end of the queue */
-void pa_memblockq_shorten(pa_memblockq *bq, size_t length);
-
-/* Empty the pa_memblockq */
-void pa_memblockq_empty(pa_memblockq *bq);
-
/* Test if the pa_memblockq is currently readable, that is, more data than base */
int pa_memblockq_is_readable(pa_memblockq *bq);
@@ -78,27 +97,38 @@ int pa_memblockq_is_readable(pa_memblockq *bq);
int pa_memblockq_is_writable(pa_memblockq *bq, size_t length);
/* Return the length of the queue in bytes */
-uint32_t pa_memblockq_get_length(pa_memblockq *bq);
+size_t pa_memblockq_get_length(pa_memblockq *bq);
/* Return how many bytes are missing in queue to the specified fill amount */
-uint32_t pa_memblockq_missing(pa_memblockq *bq);
+size_t pa_memblockq_missing(pa_memblockq *bq);
/* Returns the minimal request value */
-uint32_t pa_memblockq_get_minreq(pa_memblockq *bq);
-
-/* Force disabling of pre-buf even when the pre-buffer is not yet filled */
-void pa_memblockq_prebuf_disable(pa_memblockq *bq);
-
-/* Reenable pre-buf to the initial level */
-void pa_memblockq_prebuf_reenable(pa_memblockq *bq);
+size_t pa_memblockq_get_minreq(pa_memblockq *bq);
/* Manipulate the write pointer */
-void pa_memblockq_seek(pa_memblockq *bq, size_t delta);
+void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek);
-/* Flush the queue */
+/* Set the queue to silence, set write index to read index */
void pa_memblockq_flush(pa_memblockq *bq);
/* Get Target length */
uint32_t pa_memblockq_get_tlength(pa_memblockq *bq);
+/* Return the current read index */
+int64_t pa_memblockq_get_read_index(pa_memblockq *bq);
+
+/* Return the current write index */
+int64_t pa_memblockq_get_write_index(pa_memblockq *bq);
+
+/* Shorten the pa_memblockq to the specified length by dropping data
+ * at the read end of the queue. The read index is increased until the
+ * queue has the specified length */
+void pa_memblockq_shorten(pa_memblockq *bq, size_t length);
+
+/* Ignore prebuf for now */
+void pa_memblockq_prebuf_disable(pa_memblockq *bq);
+
+/* Force prebuf */
+void pa_memblockq_prebuf_force(pa_memblockq *bq);
+
#endif
diff --git a/src/polypcore/native-common.h b/src/polypcore/native-common.h
index ac3ea823..0d17b022 100644
--- a/src/polypcore/native-common.h
+++ b/src/polypcore/native-common.h
@@ -28,22 +28,22 @@
PA_C_DECL_BEGIN
enum {
+ /* Generic commands */
PA_COMMAND_ERROR,
PA_COMMAND_TIMEOUT, /* pseudo command */
PA_COMMAND_REPLY,
+
+ /* Commands from client to server */
PA_COMMAND_CREATE_PLAYBACK_STREAM,
PA_COMMAND_DELETE_PLAYBACK_STREAM,
PA_COMMAND_CREATE_RECORD_STREAM,
PA_COMMAND_DELETE_RECORD_STREAM,
PA_COMMAND_EXIT,
- PA_COMMAND_REQUEST,
PA_COMMAND_AUTH,
PA_COMMAND_SET_CLIENT_NAME,
PA_COMMAND_LOOKUP_SINK,
PA_COMMAND_LOOKUP_SOURCE,
PA_COMMAND_DRAIN_PLAYBACK_STREAM,
- PA_COMMAND_PLAYBACK_STREAM_KILLED,
- PA_COMMAND_RECORD_STREAM_KILLED,
PA_COMMAND_STAT,
PA_COMMAND_GET_PLAYBACK_LATENCY,
PA_COMMAND_CREATE_UPLOAD_STREAM,
@@ -68,7 +68,6 @@ enum {
PA_COMMAND_GET_SAMPLE_INFO,
PA_COMMAND_GET_SAMPLE_INFO_LIST,
PA_COMMAND_SUBSCRIBE,
- PA_COMMAND_SUBSCRIBE_EVENT,
PA_COMMAND_SET_SINK_VOLUME,
PA_COMMAND_SET_SINK_INPUT_VOLUME,
@@ -95,6 +94,15 @@ enum {
PA_COMMAND_CORK_RECORD_STREAM,
PA_COMMAND_FLUSH_RECORD_STREAM,
PA_COMMAND_PREBUF_PLAYBACK_STREAM,
+
+ /* Commands from server to client */
+ PA_COMMAND_REQUEST,
+ PA_COMMAND_OVERFLOW,
+ PA_COMMAND_UNDERFLOW,
+ PA_COMMAND_PLAYBACK_STREAM_KILLED,
+ PA_COMMAND_RECORD_STREAM_KILLED,
+ PA_COMMAND_SUBSCRIBE_EVENT,
+
PA_COMMAND_MAX
};
diff --git a/src/polypcore/packet.c b/src/polypcore/packet.c
index 41803cf9..31ddad95 100644
--- a/src/polypcore/packet.c
+++ b/src/polypcore/packet.c
@@ -32,37 +32,46 @@
pa_packet* pa_packet_new(size_t length) {
pa_packet *p;
+
assert(length);
+
p = pa_xmalloc(sizeof(pa_packet)+length);
p->ref = 1;
p->length = length;
p->data = (uint8_t*) (p+1);
p->type = PA_PACKET_APPENDED;
+
return p;
}
-pa_packet* pa_packet_new_dynamic(uint8_t* data, size_t length) {
+pa_packet* pa_packet_new_dynamic(void* data, size_t length) {
pa_packet *p;
- assert(data && length);
- p = pa_xmalloc(sizeof(pa_packet));
+
+ assert(data);
+ assert(length);
+
+ p = pa_xnew(pa_packet, 1);
p->ref = 1;
p->length = length;
p->data = data;
p->type = PA_PACKET_DYNAMIC;
+
return p;
}
pa_packet* pa_packet_ref(pa_packet *p) {
- assert(p && p->ref >= 1);
+ assert(p);
+ assert(p->ref >= 1);
+
p->ref++;
return p;
}
void pa_packet_unref(pa_packet *p) {
- assert(p && p->ref >= 1);
- p->ref--;
-
- if (p->ref == 0) {
+ assert(p);
+ assert(p->ref >= 1);
+
+ if (--p->ref == 0) {
if (p->type == PA_PACKET_DYNAMIC)
pa_xfree(p->data);
pa_xfree(p);
diff --git a/src/polypcore/packet.h b/src/polypcore/packet.h
index 0ac47485..fbc58232 100644
--- a/src/polypcore/packet.h
+++ b/src/polypcore/packet.h
@@ -33,7 +33,7 @@ typedef struct pa_packet {
} pa_packet;
pa_packet* pa_packet_new(size_t length);
-pa_packet* pa_packet_new_dynamic(uint8_t* data, size_t length);
+pa_packet* pa_packet_new_dynamic(void* data, size_t length);
pa_packet* pa_packet_ref(pa_packet *p);
void pa_packet_unref(pa_packet *p);
diff --git a/src/polypcore/protocol-esound.c b/src/polypcore/protocol-esound.c
index a16ac280..5adff57a 100644
--- a/src/polypcore/protocol-esound.c
+++ b/src/polypcore/protocol-esound.c
@@ -186,6 +186,7 @@ static void connection_free(struct connection *c) {
if (c->sink_input) {
pa_sink_input_disconnect(c->sink_input);
+ pa_log("disconnect\n");
pa_sink_input_unref(c->sink_input);
}
@@ -333,7 +334,15 @@ static int esd_proto_stream_play(struct connection *c, PA_GCC_UNUSED esd_proto_t
}
l = (size_t) (pa_bytes_per_second(&ss)*PLAYBACK_BUFFER_SECONDS);
- c->input_memblockq = pa_memblockq_new(l, 0, pa_frame_size(&ss), l/2, l/PLAYBACK_BUFFER_FRAGMENTS, c->protocol->core->memblock_stat);
+ c->input_memblockq = pa_memblockq_new(
+ 0,
+ l,
+ 0,
+ pa_frame_size(&ss),
+ (size_t) -1,
+ l/PLAYBACK_BUFFER_FRAGMENTS,
+ NULL,
+ c->protocol->core->memblock_stat);
pa_iochannel_socket_set_rcvbuf(c->io, l/PLAYBACK_BUFFER_FRAGMENTS*2);
c->playback.fragment_size = l/10;
@@ -405,7 +414,15 @@ static int esd_proto_stream_record(struct connection *c, esd_proto_t request, co
}
l = (size_t) (pa_bytes_per_second(&ss)*RECORD_BUFFER_SECONDS);
- c->output_memblockq = pa_memblockq_new(l, 0, pa_frame_size(&ss), 0, 0, c->protocol->core->memblock_stat);
+ c->output_memblockq = pa_memblockq_new(
+ 0,
+ l,
+ 0,
+ pa_frame_size(&ss),
+ 1,
+ 0,
+ NULL,
+ c->protocol->core->memblock_stat);
pa_iochannel_socket_set_sndbuf(c->io, l/RECORD_BUFFER_FRAGMENTS*2);
c->source_output->owner = c->protocol->module;
@@ -724,8 +741,7 @@ static int do_read(struct connection *c) {
assert(c->read_data_length < sizeof(c->request));
if ((r = pa_iochannel_read(c->io, ((uint8_t*) &c->request) + c->read_data_length, sizeof(c->request) - c->read_data_length)) <= 0) {
- if (r != 0)
- pa_log_warn(__FILE__": read() failed: %s\n", strerror(errno));
+ pa_log_debug(__FILE__": read() failed: %s\n", r < 0 ? strerror(errno) : "EOF");
return -1;
}
@@ -773,8 +789,7 @@ static int do_read(struct connection *c) {
assert(c->read_data && c->read_data_length < handler->data_length);
if ((r = pa_iochannel_read(c->io, (uint8_t*) c->read_data + c->read_data_length, handler->data_length - c->read_data_length)) <= 0) {
- if (r != 0)
- pa_log_warn(__FILE__": read() failed: %s\n", strerror(errno));
+ pa_log_debug(__FILE__": read() failed: %s\n", r < 0 ? strerror(errno) : "EOF");
return -1;
}
@@ -794,8 +809,7 @@ static int do_read(struct connection *c) {
assert(c->scache.memchunk.memblock && c->scache.name && c->scache.memchunk.index < c->scache.memchunk.length);
if ((r = pa_iochannel_read(c->io, (uint8_t*) c->scache.memchunk.memblock->data+c->scache.memchunk.index, c->scache.memchunk.length-c->scache.memchunk.index)) <= 0) {
- if (r!= 0)
- pa_log_warn(__FILE__": read() failed: %s\n", strerror(errno));
+ pa_log_debug(__FILE__": read() failed: %s\n", r < 0 ? strerror(errno) : "EOF");
return -1;
}
@@ -852,13 +866,10 @@ static int do_read(struct connection *c) {
}
if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) {
- if (r != 0)
- pa_log(__FILE__": read() failed: %s\n", strerror(errno));
+ pa_log_debug(__FILE__": read() failed: %s\n", r < 0 ? strerror(errno) : "EOF");
return -1;
}
-/* pa_log(__FILE__": read %u\n", r); */
-
chunk.memblock = c->playback.current_memblock;
chunk.index = c->playback.memblock_index;
chunk.length = r;
@@ -867,7 +878,7 @@ static int do_read(struct connection *c) {
c->playback.memblock_index += r;
assert(c->input_memblockq);
- pa_memblockq_push_align(c->input_memblockq, &chunk, 0);
+ pa_memblockq_push_align(c->input_memblockq, &chunk);
assert(c->sink_input);
pa_sink_notify(c->sink_input->sink);
}
@@ -910,6 +921,8 @@ static int do_write(struct connection *c) {
pa_memblockq_drop(c->output_memblockq, &chunk, r);
pa_memblock_unref(chunk.memblock);
+
+ pa_source_notify(c->source_output->source);
}
return 0;
@@ -921,21 +934,18 @@ static void do_work(struct connection *c) {
assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
c->protocol->core->mainloop->defer_enable(c->defer_event, 0);
-/* pa_log("DOWORK %i\n", pa_iochannel_is_hungup(c->io)); */
+ if (c->dead)
+ return;
- if (!c->dead && pa_iochannel_is_readable(c->io))
+ if (pa_iochannel_is_readable(c->io)) {
if (do_read(c) < 0)
goto fail;
+ } else if (pa_iochannel_is_hungup(c->io))
+ goto fail;
- if (!c->dead && pa_iochannel_is_writable(c->io))
+ if (pa_iochannel_is_writable(c->io))
if (do_write(c) < 0)
goto fail;
-
- /* In case the line was hungup, make sure to rerun this function
- as soon as possible, until all data has been read. */
-
- if (!c->dead && pa_iochannel_is_hungup(c->io))
- c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
return;
@@ -943,15 +953,17 @@ fail:
if (c->state == ESD_STREAMING_DATA && c->sink_input) {
c->dead = 1;
- pa_memblockq_prebuf_disable(c->input_memblockq);
pa_iochannel_free(c->io);
c->io = NULL;
-
+
+ pa_memblockq_prebuf_disable(c->input_memblockq);
+ pa_sink_notify(c->sink_input->sink);
} else
connection_free(c);
}
+
static void io_callback(pa_iochannel*io, void *userdata) {
struct connection *c = userdata;
assert(io && c && c->io == io);
@@ -1024,7 +1036,7 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
struct connection *c = o->userdata;
assert(o && c && chunk);
- pa_memblockq_push(c->output_memblockq, chunk, 0);
+ pa_memblockq_push(c->output_memblockq, chunk);
/* do something */
assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
diff --git a/src/polypcore/protocol-native.c b/src/polypcore/protocol-native.c
index 1362fdf2..aaa4fc48 100644
--- a/src/polypcore/protocol-native.c
+++ b/src/polypcore/protocol-native.c
@@ -48,6 +48,8 @@
#include <polypcore/authkey-prop.h>
#include <polypcore/strlist.h>
#include <polypcore/props.h>
+#include <polypcore/sample-util.h>
+#include <polypcore/llist.h>
#include "protocol-native.h"
@@ -77,6 +79,11 @@ struct playback_stream {
size_t requested_bytes;
int drain_request;
uint32_t drain_tag;
+ uint32_t syncid;
+ int underrun;
+
+ /* Sync group members */
+ PA_LLIST_FIELDS(struct playback_stream);
};
struct upload_stream {
@@ -153,7 +160,8 @@ static void command_get_server_info(pa_pdispatch *pd, uint32_t command, uint32_t
static void command_subscribe(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_set_volume(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_cork_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
-static void command_flush_or_trigger_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
+static void command_flush_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
+static void command_trigger_or_prebuf_playback_stream(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_set_default_sink_or_source(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_set_stream_name(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
static void command_kill(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata);
@@ -210,9 +218,9 @@ static const pa_pdispatch_callback command_table[PA_COMMAND_MAX] = {
[PA_COMMAND_SET_SINK_INPUT_VOLUME] = command_set_volume,
[PA_COMMAND_CORK_PLAYBACK_STREAM] = command_cork_playback_stream,
- [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_flush_or_trigger_playback_stream,
- [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_flush_or_trigger_playback_stream,
- [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_flush_or_trigger_playback_stream,
+ [PA_COMMAND_FLUSH_PLAYBACK_STREAM] = command_flush_playback_stream,
+ [PA_COMMAND_TRIGGER_PLAYBACK_STREAM] = command_trigger_or_prebuf_playback_stream,
+ [PA_COMMAND_PREBUF_PLAYBACK_STREAM] = command_trigger_or_prebuf_playback_stream,
[PA_COMMAND_CORK_RECORD_STREAM] = command_cork_record_stream,
[PA_COMMAND_FLUSH_RECORD_STREAM] = command_flush_record_stream,
@@ -244,7 +252,7 @@ static struct upload_stream* upload_stream_new(
struct upload_stream *s;
assert(c && ss && name && length);
- s = pa_xmalloc(sizeof(struct upload_stream));
+ s = pa_xnew(struct upload_stream, 1);
s->type = UPLOAD_STREAM;
s->connection = c;
s->sample_spec = *ss;
@@ -291,7 +299,7 @@ static struct record_stream* record_stream_new(
if (!(source_output = pa_source_output_new(source, __FILE__, name, ss, map, -1)))
return NULL;
- s = pa_xmalloc(sizeof(struct record_stream));
+ s = pa_xnew(struct record_stream, 1);
s->connection = c;
s->source_output = source_output;
s->source_output->push = source_output_push_cb;
@@ -301,7 +309,15 @@ static struct record_stream* record_stream_new(
s->source_output->owner = c->protocol->module;
s->source_output->client = c->client;
- s->memblockq = pa_memblockq_new(maxlength, 0, base = pa_frame_size(ss), 0, 0, c->protocol->core->memblock_stat);
+ s->memblockq = pa_memblockq_new(
+ 0,
+ maxlength,
+ 0,
+ base = pa_frame_size(ss),
+ 1,
+ 0,
+ NULL,
+ c->protocol->core->memblock_stat);
assert(s->memblockq);
s->fragment_size = (fragment_size/base)*base;
@@ -332,19 +348,40 @@ static struct playback_stream* playback_stream_new(
size_t tlength,
size_t prebuf,
size_t minreq,
- pa_cvolume *volume) {
+ pa_cvolume *volume,
+ uint32_t syncid) {
- struct playback_stream *s;
+ struct playback_stream *s, *sync;
pa_sink_input *sink_input;
+ pa_memblock *silence;
+ uint32_t idx;
+ int64_t start_index;
+
assert(c && sink && ss && name && maxlength);
+ /* Find syncid group */
+ for (sync = pa_idxset_first(c->output_streams, &idx); sync; sync = pa_idxset_next(c->output_streams, &idx)) {
+
+ if (sync->type != PLAYBACK_STREAM)
+ continue;
+
+ if (sync->syncid == syncid)
+ break;
+ }
+
+ /* Synced streams must connect to the same sink */
+ if (sync && sync->sink_input->sink != sink)
+ return NULL;
+
if (!(sink_input = pa_sink_input_new(sink, __FILE__, name, ss, map, 0, -1)))
return NULL;
- s = pa_xmalloc(sizeof(struct playback_stream));
+ s = pa_xnew(struct playback_stream, 1);
s->type = PLAYBACK_STREAM;
s->connection = c;
+ s->syncid = syncid;
s->sink_input = sink_input;
+ s->underrun = 1;
s->sink_input->peek = sink_input_peek_cb;
s->sink_input->drop = sink_input_drop_cb;
@@ -353,24 +390,56 @@ static struct playback_stream* playback_stream_new(
s->sink_input->userdata = s;
s->sink_input->owner = c->protocol->module;
s->sink_input->client = c->client;
-
- s->memblockq = pa_memblockq_new(maxlength, tlength, pa_frame_size(ss), prebuf, minreq, c->protocol->core->memblock_stat);
- assert(s->memblockq);
+ if (sync) {
+ /* Sync id found, now find head of list */
+ PA_LLIST_FIND_HEAD(struct playback_stream, sync, &sync);
+
+ /* Prepend ourselves */
+ PA_LLIST_PREPEND(struct playback_stream, sync, s);
+
+ /* Set our start index to the current read index of the other grozp member(s) */
+ assert(sync->next);
+ start_index = pa_memblockq_get_read_index(sync->next->memblockq);
+ } else {
+ /* This ia a new sync group */
+ PA_LLIST_INIT(struct playback_stream, s);
+ start_index = 0;
+ }
+
+ silence = pa_silence_memblock_new(ss, 0, c->protocol->core->memblock_stat);
+
+ s->memblockq = pa_memblockq_new(
+ start_index,
+ maxlength,
+ tlength,
+ pa_frame_size(ss),
+ prebuf,
+ minreq,
+ silence,
+ c->protocol->core->memblock_stat);
+
+ pa_memblock_unref(silence);
+
s->requested_bytes = 0;
s->drain_request = 0;
s->sink_input->volume = *volume;
pa_idxset_put(c->output_streams, s, &s->index);
+
return s;
}
static void playback_stream_free(struct playback_stream* p) {
+ struct playback_stream *head;
assert(p && p->connection);
if (p->drain_request)
- pa_pstream_send_error(p->connection->pstream, p->drain_tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(p->connection->pstream, p->drain_tag, PA_ERR_NOENTITY);
+
+ PA_LLIST_FIND_HEAD(struct playback_stream, p, &head);
+ PA_LLIST_REMOVE(struct playback_stream, head, p);
pa_idxset_remove_by_data(p->connection->output_streams, p, NULL);
pa_sink_input_disconnect(p->sink_input);
@@ -436,7 +505,7 @@ static void request_bytes(struct playback_stream *s) {
pa_tagstruct_putu32(t, l);
pa_pstream_send_tagstruct(s->connection->pstream, t);
-/* pa_log(__FILE__": Requesting %u bytes\n", l); */
+/* pa_log(__FILE__": Requesting %u bytes\n", l); */
}
static void send_memblock(struct connection *c) {
@@ -461,7 +530,7 @@ static void send_memblock(struct connection *c) {
if (schunk.length > r->fragment_size)
schunk.length = r->fragment_size;
- pa_pstream_send_memblock(c->pstream, r->index, 0, &schunk);
+ pa_pstream_send_memblock(c->pstream, r->index, 0, PA_SEEK_RELATIVE, &schunk);
pa_memblockq_drop(r->memblockq, &chunk, schunk.length);
pa_memblock_unref(schunk.memblock);
@@ -501,9 +570,27 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
assert(i && i->userdata && chunk);
s = i->userdata;
- if (pa_memblockq_peek(s->memblockq, chunk) < 0)
+ if (pa_memblockq_get_length(s->memblockq) <= 0 && !s->underrun) {
+ pa_tagstruct *t;
+
+ /* Report that we're empty */
+
+ t = pa_tagstruct_new(NULL, 0);
+ pa_tagstruct_putu32(t, PA_COMMAND_UNDERFLOW);
+ pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
+ pa_tagstruct_putu32(t, s->index);
+ pa_pstream_send_tagstruct(s->connection->pstream, t);
+
+ s->underrun = 1;
+ }
+
+ if (pa_memblockq_peek(s->memblockq, chunk) < 0) {
+ pa_log(__FILE__": peek: failure\n");
return -1;
+ }
+/* pa_log(__FILE__": peek: %u\n", chunk->length); */
+
return 0;
}
@@ -513,6 +600,7 @@ static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_
s = i->userdata;
pa_memblockq_drop(s->memblockq, chunk, length);
+
request_bytes(s);
if (s->drain_request && !pa_memblockq_is_readable(s->memblockq)) {
@@ -520,7 +608,7 @@ static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_
s->drain_request = 0;
}
-/* pa_log(__FILE__": after_drop: %u\n", pa_memblockq_get_length(s->memblockq)); */
+/* pa_log(__FILE__": after_drop: %u %u\n", pa_memblockq_get_length(s->memblockq), pa_memblockq_is_readable(s->memblockq)); */
}
static void sink_input_kill_cb(pa_sink_input *i) {
@@ -546,7 +634,11 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
assert(o && o->userdata && chunk);
s = o->userdata;
- pa_memblockq_push_align(s->memblockq, chunk, 0);
+ if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
+ pa_log_warn(__FILE__": Failed to push data into output queue.\n");
+ return;
+ }
+
if (!pa_pstream_is_pending(s->connection->pstream))
send_memblock(s->connection);
}
@@ -578,7 +670,7 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC
struct connection *c = userdata;
struct playback_stream *s;
size_t maxlength, tlength, prebuf, minreq;
- uint32_t sink_index;
+ uint32_t sink_index, syncid;
const char *name, *sink_name;
pa_sample_spec ss;
pa_channel_map map;
@@ -601,6 +693,7 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC
PA_TAG_U32, &tlength,
PA_TAG_U32, &prebuf,
PA_TAG_U32, &minreq,
+ PA_TAG_U32, &syncid,
PA_TAG_CVOLUME, &volume,
PA_TAG_INVALID) < 0 ||
!pa_tagstruct_eof(t) ||
@@ -610,23 +703,23 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
- if (sink_index != (uint32_t) -1)
+ if (sink_index != PA_INVALID_INDEX)
sink = pa_idxset_get_by_index(c->protocol->core->sinks, sink_index);
else
sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK, 1);
if (!sink) {
- pa_log("%s: Can't find a suitable sink.\n", __FILE__);
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_log_warn(__FILE__": Can't find a suitable sink.\n");
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
- if (!(s = playback_stream_new(c, sink, &ss, &map, name, maxlength, tlength, prebuf, minreq, &volume))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
+ if (!(s = playback_stream_new(c, sink, &ss, &map, name, maxlength, tlength, prebuf, minreq, &volume, syncid))) {
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
return;
}
@@ -656,14 +749,14 @@ static void command_delete_stream(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (command == PA_COMMAND_DELETE_PLAYBACK_STREAM) {
struct playback_stream *s;
if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != PLAYBACK_STREAM)) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
return;
}
@@ -671,7 +764,7 @@ static void command_delete_stream(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma
} else if (command == PA_COMMAND_DELETE_RECORD_STREAM) {
struct record_stream *s;
if (!(s = pa_idxset_get_by_index(c->record_streams, channel))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
return;
}
@@ -680,7 +773,7 @@ static void command_delete_stream(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma
struct upload_stream *s;
assert(command == PA_COMMAND_DELETE_UPLOAD_STREAM);
if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != UPLOAD_STREAM)) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
return;
}
@@ -717,7 +810,7 @@ static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -727,12 +820,12 @@ static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
source = pa_namereg_get(c->protocol->core, source_name, PA_NAMEREG_SOURCE, 1);
if (!source) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
if (!(s = record_stream_new(c, source, &ss, &map, name, maxlength, fragment_size))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
return;
}
@@ -758,7 +851,7 @@ static void command_exit(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -782,7 +875,7 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
if (!c->authorized) {
if (memcmp(c->protocol->auth_cookie, cookie, PA_NATIVE_COOKIE_LENGTH) != 0) {
pa_log(__FILE__": Denied access to client with invalid authorization key.\n");
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -826,7 +919,7 @@ static void command_lookup(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uin
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -842,7 +935,7 @@ static void command_lookup(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uin
}
if (idx == PA_IDXSET_INVALID)
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
else {
pa_tagstruct *reply;
reply = pa_tagstruct_new(NULL, 0);
@@ -867,12 +960,12 @@ static void command_drain_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(s = pa_idxset_get_by_index(c->output_streams, idx)) || s->type != PLAYBACK_STREAM) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -881,10 +974,10 @@ static void command_drain_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC
pa_memblockq_prebuf_disable(s->memblockq);
if (!pa_memblockq_is_readable(s->memblockq)) {
-/* pa_log("immediate drain: %u\n", pa_memblockq_get_length(s->memblockq)); */
+/* pa_log("immediate drain: %u\n", pa_memblockq_get_length(s->memblockq)); */
pa_pstream_send_simple_ack(c->pstream, tag);
} else {
-/* pa_log("slow drain triggered\n"); */
+/* pa_log("slow drain triggered\n"); */
s->drain_request = 1;
s->drain_tag = tag;
@@ -903,7 +996,7 @@ static void command_stat(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -937,12 +1030,12 @@ static void command_get_playback_latency(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(s = pa_idxset_get_by_index(c->output_streams, idx)) || s->type != PLAYBACK_STREAM) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -980,12 +1073,12 @@ static void command_get_record_latency(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UN
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1026,17 +1119,17 @@ static void command_create_upload_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if ((length % pa_frame_size(&ss)) != 0 || length <= 0 || !*name) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
return;
}
if (!(s = upload_stream_new(c, &ss, &map, name, length))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_INVALID);
return;
}
@@ -1063,12 +1156,12 @@ static void command_finish_upload_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(s = pa_idxset_get_by_index(c->output_streams, channel)) || (s->type != UPLOAD_STREAM)) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
return;
}
@@ -1095,7 +1188,7 @@ static void command_play_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED ui
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1105,12 +1198,12 @@ static void command_play_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED ui
sink = pa_namereg_get(c->protocol->core, sink_name, PA_NAMEREG_SINK, 1);
if (!sink) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
if (pa_scache_play_item(c->protocol->core, name, sink, &volume) < 0) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1129,12 +1222,12 @@ static void command_remove_sample(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (pa_scache_remove_item(c->protocol->core, name) < 0) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1261,7 +1354,7 @@ static void command_get_info(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, u
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1292,7 +1385,7 @@ static void command_get_info(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, u
}
if (!sink && !source && !client && !module && !si && !so && !sce) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1331,7 +1424,7 @@ static void command_get_info_list(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t comma
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1394,7 +1487,7 @@ static void command_get_server_info(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSE
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1444,7 +1537,7 @@ static void command_subscribe(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1478,7 +1571,7 @@ static void command_set_volume(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command,
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1493,7 +1586,7 @@ static void command_set_volume(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command,
}
if (!si && !sink) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1509,7 +1602,7 @@ static void command_cork_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
struct connection *c = userdata;
uint32_t idx;
int b;
- struct playback_stream *s;
+ struct playback_stream *s, *sync;
assert(c && t);
if (pa_tagstruct_getu32(t, &idx) < 0 ||
@@ -1520,20 +1613,82 @@ static void command_cork_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(s = pa_idxset_get_by_index(c->output_streams, idx)) || s->type != PLAYBACK_STREAM) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
+ fprintf(stderr, "Corking %i\n", b);
+
pa_sink_input_cork(s->sink_input, b);
+ pa_memblockq_prebuf_force(s->memblockq);
+
+ /* Do the same for all other members in the sync group */
+ for (sync = s->prev; sync; sync = sync->prev) {
+ pa_sink_input_cork(sync->sink_input, b);
+ pa_memblockq_prebuf_force(sync->memblockq);
+ }
+
+ for (sync = s->next; sync; sync = sync->next) {
+ pa_sink_input_cork(sync->sink_input, b);
+ pa_memblockq_prebuf_force(sync->memblockq);
+ }
+
+ pa_pstream_send_simple_ack(c->pstream, tag);
+}
+
+static void command_flush_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
+ struct connection *c = userdata;
+ uint32_t idx;
+ struct playback_stream *s, *sync;
+ assert(c && t);
+
+ if (pa_tagstruct_getu32(t, &idx) < 0 ||
+ !pa_tagstruct_eof(t)) {
+ protocol_error(c);
+ return;
+ }
+
+ if (!c->authorized) {
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
+ return;
+ }
+
+ if (!(s = pa_idxset_get_by_index(c->output_streams, idx)) || s->type != PLAYBACK_STREAM) {
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
+ return;
+ }
+
+ pa_memblockq_flush(s->memblockq);
+ s->underrun = 0;
+
+ /* Do the same for all other members in the sync group */
+ for (sync = s->prev; sync; sync = sync->prev) {
+ pa_memblockq_flush(sync->memblockq);
+ sync->underrun = 0;
+ }
+
+ for (sync = s->next; sync; sync = sync->next) {
+ pa_memblockq_flush(sync->memblockq);
+ sync->underrun = 0;
+ }
+
pa_pstream_send_simple_ack(c->pstream, tag);
+ pa_sink_notify(s->sink_input->sink);
+ request_bytes(s);
+
+ for (sync = s->prev; sync; sync = sync->prev)
+ request_bytes(sync);
+
+ for (sync = s->next; sync; sync = sync->next)
+ request_bytes(sync);
}
-static void command_flush_or_trigger_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
+static void command_trigger_or_prebuf_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
struct connection *c = userdata;
uint32_t idx;
struct playback_stream *s;
@@ -1546,23 +1701,26 @@ static void command_flush_or_trigger_playback_stream(PA_GCC_UNUSED pa_pdispatch
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(s = pa_idxset_get_by_index(c->output_streams, idx)) || s->type != PLAYBACK_STREAM) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
- if (command == PA_COMMAND_PREBUF_PLAYBACK_STREAM)
- pa_memblockq_prebuf_reenable(s->memblockq);
- else if (command == PA_COMMAND_TRIGGER_PLAYBACK_STREAM)
- pa_memblockq_prebuf_disable(s->memblockq);
- else {
- assert(command == PA_COMMAND_FLUSH_PLAYBACK_STREAM);
- pa_memblockq_flush(s->memblockq);
- /*pa_log(__FILE__": flush: %u\n", pa_memblockq_get_length(s->memblockq));*/
+ switch (command) {
+ case PA_COMMAND_PREBUF_PLAYBACK_STREAM:
+ pa_memblockq_prebuf_force(s->memblockq);
+ break;
+
+ case PA_COMMAND_TRIGGER_PLAYBACK_STREAM:
+ pa_memblockq_prebuf_disable(s->memblockq);
+ break;
+
+ default:
+ abort();
}
pa_sink_notify(s->sink_input->sink);
@@ -1585,16 +1743,17 @@ static void command_cork_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UN
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
pa_source_output_cork(s->source_output, b);
+ pa_memblockq_prebuf_force(s->memblockq);
pa_pstream_send_simple_ack(c->pstream, tag);
}
@@ -1611,12 +1770,12 @@ static void command_flush_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_U
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1638,7 +1797,7 @@ static void command_set_default_sink_or_source(PA_GCC_UNUSED pa_pdispatch *pd, u
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1660,7 +1819,7 @@ static void command_set_stream_name(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t com
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1668,7 +1827,7 @@ static void command_set_stream_name(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t com
struct playback_stream *s;
if (!(s = pa_idxset_get_by_index(c->output_streams, idx)) || s->type != PLAYBACK_STREAM) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1678,7 +1837,7 @@ static void command_set_stream_name(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t com
struct record_stream *s;
if (!(s = pa_idxset_get_by_index(c->record_streams, idx))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1700,7 +1859,7 @@ static void command_kill(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint3
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1708,7 +1867,7 @@ static void command_kill(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint3
pa_client *client;
if (!(client = pa_idxset_get_by_index(c->protocol->core->clients, idx))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1717,7 +1876,7 @@ static void command_kill(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint3
pa_sink_input *s;
if (!(s = pa_idxset_get_by_index(c->protocol->core->sink_inputs, idx))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1728,7 +1887,7 @@ static void command_kill(PA_GCC_UNUSED pa_pdispatch *pd, uint32_t command, uint3
assert(command == PA_COMMAND_KILL_SOURCE_OUTPUT);
if (!(s = pa_idxset_get_by_index(c->protocol->core->source_outputs, idx))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1753,12 +1912,12 @@ static void command_load_module(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED ui
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(m = pa_module_load(c->protocol->core, name, argument))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_INITFAILED);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_MODINITFAILED);
return;
}
@@ -1782,12 +1941,12 @@ static void command_unload_module(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (!(m = pa_idxset_get_by_index(c->protocol->core->modules, idx))) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1813,12 +1972,12 @@ static void command_add_autoload(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED u
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
if (pa_autoload_add(c->protocol->core, name, type == 0 ? PA_NAMEREG_SINK : PA_NAMEREG_SOURCE, module, argument, &idx) < 0) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_EXIST);
return;
}
@@ -1847,7 +2006,7 @@ static void command_remove_autoload(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSE
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1857,7 +2016,7 @@ static void command_remove_autoload(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSE
r = pa_autoload_remove_by_index(c->protocol->core, idx);
if (r < 0) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1893,7 +2052,7 @@ static void command_get_autoload_info(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNU
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1904,7 +2063,7 @@ static void command_get_autoload_info(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNU
a = pa_autoload_get_by_index(c->protocol->core, idx);
if (!a) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_NOENTITY);
return;
}
@@ -1927,7 +2086,7 @@ static void command_get_autoload_info_list(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC
}
if (!c->authorized) {
- pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
+ pa_pstream_send_error(c->pstream, tag, PA_ERR_ACCESS);
return;
}
@@ -1958,7 +2117,7 @@ static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, void *user
}
}
-static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, uint32_t delta, const pa_memchunk *chunk, void *userdata) {
+static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata) {
struct connection *c = userdata;
struct output_stream *stream;
assert(p && chunk && userdata);
@@ -1975,13 +2134,30 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, uint32_t
ps->requested_bytes = 0;
else
ps->requested_bytes -= chunk->length;
-
- pa_memblockq_push_align(ps->memblockq, chunk, delta);
- assert(ps->sink_input);
-/* pa_log(__FILE__": after_recv: %u\n", pa_memblockq_get_length(p->memblockq)); */
+ pa_memblockq_seek(ps->memblockq, offset, seek);
+
+ if (pa_memblockq_push_align(ps->memblockq, chunk) < 0) {
+ pa_tagstruct *t;
+
+ pa_log_warn(__FILE__": failed to push data into queue\n");
+
+ /* Pushing this block into the queue failed, so we simulate
+ * it by skipping ahead */
+
+ pa_memblockq_seek(ps->memblockq, chunk->length, PA_SEEK_RELATIVE);
+
+ /* Notify the user */
+ t = pa_tagstruct_new(NULL, 0);
+ pa_tagstruct_putu32(t, PA_COMMAND_OVERFLOW);
+ pa_tagstruct_putu32(t, (uint32_t) -1); /* tag */
+ pa_tagstruct_putu32(t, ps->index);
+ pa_pstream_send_tagstruct(p, t);
+ }
+
+ ps->underrun = 0;
+
pa_sink_notify(ps->sink_input->sink);
-/* pa_log(__FILE__": Recieved %u bytes.\n", chunk->length); */
} else {
struct upload_stream *u = (struct upload_stream*) stream;
diff --git a/src/polypcore/protocol-simple.c b/src/polypcore/protocol-simple.c
index 4d3f8e1d..fac54239 100644
--- a/src/polypcore/protocol-simple.c
+++ b/src/polypcore/protocol-simple.c
@@ -52,6 +52,8 @@ struct connection {
pa_memblockq *input_memblockq, *output_memblockq;
pa_defer_event *defer_event;
+ int dead;
+
struct {
pa_memblock *current_memblock;
size_t memblock_index, fragment_size;
@@ -130,7 +132,7 @@ static int do_read(struct connection *c) {
}
if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) {
- pa_log(__FILE__": read() failed: %s\n", r == 0 ? "EOF" : strerror(errno));
+ pa_log_debug(__FILE__": read() failed: %s\n", r == 0 ? "EOF" : strerror(errno));
return -1;
}
@@ -142,7 +144,7 @@ static int do_read(struct connection *c) {
c->playback.memblock_index += r;
assert(c->input_memblockq);
- pa_memblockq_push_align(c->input_memblockq, &chunk, 0);
+ pa_memblockq_push_align(c->input_memblockq, &chunk);
assert(c->sink_input);
pa_sink_notify(c->sink_input->sink);
@@ -170,32 +172,46 @@ static int do_write(struct connection *c) {
pa_memblockq_drop(c->output_memblockq, &chunk, r);
pa_memblock_unref(chunk.memblock);
+
+ pa_source_notify(c->source_output->source);
return 0;
}
-
static void do_work(struct connection *c) {
assert(c);
assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
c->protocol->core->mainloop->defer_enable(c->defer_event, 0);
- if (pa_iochannel_is_writable(c->io))
- if (do_write(c) < 0)
- goto fail;
+ if (c->dead)
+ return;
- if (pa_iochannel_is_readable(c->io))
+ if (pa_iochannel_is_readable(c->io)) {
if (do_read(c) < 0)
goto fail;
+ } else if (pa_iochannel_is_hungup(c->io))
+ goto fail;
- if (pa_iochannel_is_hungup(c->io))
- c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
+ if (pa_iochannel_is_writable(c->io)) {
+ if (do_write(c) < 0)
+ goto fail;
+ }
return;
fail:
- connection_free(c);
+
+ if (c->sink_input) {
+ c->dead = 1;
+
+ pa_iochannel_free(c->io);
+ c->io = NULL;
+
+ pa_memblockq_prebuf_disable(c->input_memblockq);
+ pa_sink_notify(c->sink_input->sink);
+ } else
+ connection_free(c);
}
/*** sink_input callbacks ***/
@@ -205,8 +221,13 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
assert(i && i->userdata && chunk);
c = i->userdata;
- if (pa_memblockq_peek(c->input_memblockq, chunk) < 0)
+ if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) {
+
+ if (c->dead)
+ connection_free(c);
+
return -1;
+ }
return 0;
}
@@ -240,7 +261,7 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
struct connection *c = o->userdata;
assert(o && c && chunk);
- pa_memblockq_push(c->output_memblockq, chunk, 0);
+ pa_memblockq_push(c->output_memblockq, chunk);
/* do something */
assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
@@ -307,6 +328,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
c->playback.current_memblock = NULL;
c->playback.memblock_index = 0;
c->playback.fragment_size = 0;
+ c->dead = 0;
pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname));
c->client = pa_client_new(p->core, __FILE__, cname);
@@ -339,7 +361,15 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
c->sink_input->userdata = c;
l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS);
- c->input_memblockq = pa_memblockq_new(l, 0, pa_frame_size(&p->sample_spec), l/2, l/PLAYBACK_BUFFER_FRAGMENTS, p->core->memblock_stat);
+ c->input_memblockq = pa_memblockq_new(
+ 0,
+ l,
+ 0,
+ pa_frame_size(&p->sample_spec),
+ (size_t) -1,
+ l/PLAYBACK_BUFFER_FRAGMENTS,
+ NULL,
+ p->core->memblock_stat);
assert(c->input_memblockq);
pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5);
c->playback.fragment_size = l/10;
@@ -368,7 +398,15 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
c->source_output->userdata = c;
l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS);
- c->output_memblockq = pa_memblockq_new(l, 0, pa_frame_size(&p->sample_spec), 0, 0, p->core->memblock_stat);
+ c->output_memblockq = pa_memblockq_new(
+ 0,
+ l,
+ 0,
+ pa_frame_size(&p->sample_spec),
+ 1,
+ 0,
+ NULL,
+ p->core->memblock_stat);
pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2);
}
diff --git a/src/polypcore/pstream.c b/src/polypcore/pstream.c
index eec93a0f..c697dc3d 100644
--- a/src/polypcore/pstream.c
+++ b/src/polypcore/pstream.c
@@ -40,12 +40,14 @@
#include "pstream.h"
-typedef enum pa_pstream_descriptor_index {
+enum {
PA_PSTREAM_DESCRIPTOR_LENGTH,
PA_PSTREAM_DESCRIPTOR_CHANNEL,
- PA_PSTREAM_DESCRIPTOR_DELTA,
+ PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
+ PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
+ PA_PSTREAM_DESCRIPTOR_SEEK,
PA_PSTREAM_DESCRIPTOR_MAX
-} pa_pstream_descriptor_index;
+};
typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
@@ -58,7 +60,8 @@ struct item_info {
/* memblock info */
pa_memchunk chunk;
uint32_t channel;
- uint32_t delta;
+ int64_t offset;
+ pa_seek_mode_t seek_mode;
/* packet info */
pa_packet *packet;
@@ -94,7 +97,7 @@ struct pa_pstream {
void (*recieve_packet_callback) (pa_pstream *p, pa_packet *packet, void *userdata);
void *recieve_packet_callback_userdata;
- void (*recieve_memblock_callback) (pa_pstream *p, uint32_t channel, uint32_t delta, const pa_memchunk *chunk, void *userdata);
+ void (*recieve_memblock_callback) (pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata);
void *recieve_memblock_callback_userdata;
void (*drain_callback)(pa_pstream *p, void *userdata);
@@ -103,8 +106,8 @@ struct pa_pstream {
pa_memblock_stat *memblock_stat;
};
-static void do_write(pa_pstream *p);
-static void do_read(pa_pstream *p);
+static int do_write(pa_pstream *p);
+static int do_read(pa_pstream *p);
static void do_something(pa_pstream *p) {
assert(p);
@@ -112,31 +115,47 @@ static void do_something(pa_pstream *p) {
p->mainloop->defer_enable(p->defer_event, 0);
pa_pstream_ref(p);
-
- if (!p->dead && pa_iochannel_is_readable(p->io))
- do_read(p);
- if (!p->dead && pa_iochannel_is_writable(p->io))
- do_write(p);
+ if (!p->dead && pa_iochannel_is_readable(p->io)) {
+ if (do_read(p) < 0)
+ goto fail;
+ } else if (!p->dead && pa_iochannel_is_hungup(p->io))
+ goto fail;
+
+ if (!p->dead && pa_iochannel_is_writable(p->io)) {
+ if (do_write(p) < 0)
+ goto fail;
+ }
+
+ pa_pstream_unref(p);
+ return;
+
+fail:
- /* In case the line was hungup, make sure to rerun this function
- as soon as possible, until all data has been read. */
+ p->dead = 1;
- if (!p->dead && pa_iochannel_is_hungup(p->io))
- p->mainloop->defer_enable(p->defer_event, 1);
+ if (p->die_callback)
+ p->die_callback(p, p->die_callback_userdata);
pa_pstream_unref(p);
}
static void io_callback(pa_iochannel*io, void *userdata) {
pa_pstream *p = userdata;
- assert(p && p->io == io);
+
+ assert(p);
+ assert(p->io == io);
+
do_something(p);
}
static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
pa_pstream *p = userdata;
- assert(p && p->defer_event == e && p->mainloop == m);
+
+ assert(p);
+ assert(p->defer_event == e);
+ assert(p->mainloop == m);
+
do_something(p);
}
@@ -144,7 +163,8 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_sta
pa_pstream *p;
assert(io);
- p = pa_xmalloc(sizeof(pa_pstream));
+ p = pa_xnew(pa_pstream, 1);
+
p->ref = 1;
p->io = io;
pa_iochannel_set_callback(io, io_callback, p);
@@ -228,7 +248,7 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet) {
/* pa_log(__FILE__": push-packet %p\n", packet); */
- i = pa_xmalloc(sizeof(struct item_info));
+ i = pa_xnew(struct item_info, 1);
i->type = PA_PSTREAM_ITEM_PACKET;
i->packet = pa_packet_ref(packet);
@@ -236,7 +256,7 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet) {
p->mainloop->defer_enable(p->defer_event, 1);
}
-void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, uint32_t delta, const pa_memchunk *chunk) {
+void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
struct item_info *i;
assert(p && channel != (uint32_t) -1 && chunk && p->ref >= 1);
@@ -245,11 +265,12 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, uint32_t delta, co
/* pa_log(__FILE__": push-memblock %p\n", chunk); */
- i = pa_xmalloc(sizeof(struct item_info));
+ i = pa_xnew(struct item_info, 1);
i->type = PA_PSTREAM_ITEM_MEMBLOCK;
i->chunk = *chunk;
i->channel = channel;
- i->delta = delta;
+ i->offset = offset;
+ i->seek_mode = seek_mode;
pa_memblock_ref(i->chunk.memblock);
@@ -264,7 +285,7 @@ void pa_pstream_set_recieve_packet_callback(pa_pstream *p, void (*callback) (pa_
p->recieve_packet_callback_userdata = userdata;
}
-void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, void (*callback) (pa_pstream *p, uint32_t channel, uint32_t delta, const pa_memchunk *chunk, void *userdata), void *userdata) {
+void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, void (*callback) (pa_pstream *p, uint32_t channel, int64_t delta, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata), void *userdata) {
assert(p && callback);
p->recieve_memblock_callback = callback;
@@ -286,17 +307,21 @@ static void prepare_next_write_item(pa_pstream *p) {
p->write.data = p->write.current->packet->data;
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
- p->write.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA] = 0;
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = 0;
} else {
assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock);
p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
- p->write.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA] = htonl(p->write.current->delta);
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = htonl(p->write.current->seek_mode);
}
}
-static void do_write(pa_pstream *p) {
+static int do_write(pa_pstream *p) {
void *d;
size_t l;
ssize_t r;
@@ -306,7 +331,7 @@ static void do_write(pa_pstream *p) {
prepare_next_write_item(p);
if (!p->write.current)
- return;
+ return 0;
assert(p->write.data);
@@ -319,7 +344,7 @@ static void do_write(pa_pstream *p) {
}
if ((r = pa_iochannel_write(p->io, d, l)) < 0)
- goto die;
+ return -1;
p->write.index += r;
@@ -332,15 +357,10 @@ static void do_write(pa_pstream *p) {
p->drain_callback(p, p->drain_userdata);
}
- return;
-
-die:
- p->dead = 1;
- if (p->die_callback)
- p->die_callback(p, p->die_callback_userdata);
+ return 0;
}
-static void do_read(pa_pstream *p) {
+static int do_read(pa_pstream *p) {
void *d;
size_t l;
ssize_t r;
@@ -356,7 +376,7 @@ static void do_read(pa_pstream *p) {
}
if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
- goto die;
+ return -1;
p->read.index += r;
@@ -365,8 +385,8 @@ static void do_read(pa_pstream *p) {
/* Frame size too large */
if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) > FRAME_SIZE_MAX) {
- pa_log(__FILE__": Frame size too large\n");
- goto die;
+ pa_log_warn(__FILE__": Frame size too large\n");
+ return -1;
}
assert(!p->read.packet && !p->read.memblock);
@@ -374,13 +394,16 @@ static void do_read(pa_pstream *p) {
if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]) == (uint32_t) -1) {
/* Frame is a packet frame */
p->read.packet = pa_packet_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]));
- assert(p->read.packet);
p->read.data = p->read.packet->data;
} else {
/* Frame is a memblock frame */
p->read.memblock = pa_memblock_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), p->memblock_stat);
- assert(p->read.memblock);
p->read.data = p->read.memblock->data;
+
+ if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]) > PA_SEEK_RELATIVE_END) {
+ pa_log_warn(__FILE__": Invalid seek mode\n");
+ return -1;
+ }
}
} else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
@@ -396,13 +419,26 @@ static void do_read(pa_pstream *p) {
chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
chunk.length = l;
- if (p->recieve_memblock_callback)
+ if (p->recieve_memblock_callback) {
+ int64_t offset;
+
+ offset = (int64_t) (
+ (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
+ (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
+
p->recieve_memblock_callback(
p,
ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
- ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA]),
+ offset,
+ ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]),
&chunk,
p->recieve_memblock_callback_userdata);
+ }
+
+ /* Drop seek info for following callbacks */
+ p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] =
+ p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
+ p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
}
}
@@ -427,13 +463,7 @@ static void do_read(pa_pstream *p) {
}
}
- return;
-
-die:
- p->dead = 1;
- if (p->die_callback)
- p->die_callback(p, p->die_callback_userdata);
-
+ return 0;
}
void pa_pstream_set_die_callback(pa_pstream *p, void (*callback)(pa_pstream *p, void *userdata), void *userdata) {
@@ -453,20 +483,24 @@ int pa_pstream_is_pending(pa_pstream *p) {
void pa_pstream_set_drain_callback(pa_pstream *p, void (*cb)(pa_pstream *p, void *userdata), void *userdata) {
assert(p);
+ assert(p->ref >= 1);
p->drain_callback = cb;
p->drain_userdata = userdata;
}
void pa_pstream_unref(pa_pstream*p) {
- assert(p && p->ref >= 1);
+ assert(p);
+ assert(p->ref >= 1);
- if (!(--(p->ref)))
+ if (--p->ref == 0)
pstream_free(p);
}
pa_pstream* pa_pstream_ref(pa_pstream*p) {
- assert(p && p->ref >= 1);
+ assert(p);
+ assert(p->ref >= 1);
+
p->ref++;
return p;
}
diff --git a/src/polypcore/pstream.h b/src/polypcore/pstream.h
index 10ce58f6..741ba9b5 100644
--- a/src/polypcore/pstream.h
+++ b/src/polypcore/pstream.h
@@ -25,6 +25,7 @@
#include <inttypes.h>
#include <polyp/mainloop-api.h>
+#include <polyp/def.h>
#include <polypcore/packet.h>
#include <polypcore/memblock.h>
#include <polypcore/iochannel.h>
@@ -37,10 +38,10 @@ void pa_pstream_unref(pa_pstream*p);
pa_pstream* pa_pstream_ref(pa_pstream*p);
void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet);
-void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, uint32_t delta, const pa_memchunk *chunk);
+void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk);
void pa_pstream_set_recieve_packet_callback(pa_pstream *p, void (*callback) (pa_pstream *p, pa_packet *packet, void *userdata), void *userdata);
-void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, void (*callback) (pa_pstream *p, uint32_t channel, uint32_t delta, const pa_memchunk *chunk, void *userdata), void *userdata);
+void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, void (*callback) (pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata), void *userdata);
void pa_pstream_set_drain_callback(pa_pstream *p, void (*cb)(pa_pstream *p, void *userdata), void *userdata);
void pa_pstream_set_die_callback(pa_pstream *p, void (*callback)(pa_pstream *p, void *userdata), void *userdata);
diff --git a/src/polypcore/sample-util.c b/src/polypcore/sample-util.c
index e3bb4aa6..e588446d 100644
--- a/src/polypcore/sample-util.c
+++ b/src/polypcore/sample-util.c
@@ -34,6 +34,15 @@
#include "sample-util.h"
+pa_memblock *pa_silence_memblock_new(const pa_sample_spec *spec, size_t length, pa_memblock_stat*s) {
+ assert(spec);
+
+ if (length == 0)
+ length = pa_bytes_per_second(spec)/10; /* 100 ms */
+
+ return pa_silence_memblock(pa_memblock_new(length, s), spec);
+}
+
pa_memblock *pa_silence_memblock(pa_memblock* b, const pa_sample_spec *spec) {
assert(b && b->data && spec);
pa_silence_memory(b->data, b->length, spec);
diff --git a/src/polypcore/sample-util.h b/src/polypcore/sample-util.h
index 486d284b..7ea01a30 100644
--- a/src/polypcore/sample-util.h
+++ b/src/polypcore/sample-util.h
@@ -28,6 +28,7 @@
#include <polypcore/memchunk.h>
pa_memblock *pa_silence_memblock(pa_memblock* b, const pa_sample_spec *spec);
+pa_memblock *pa_silence_memblock_new(const pa_sample_spec *spec, size_t length, pa_memblock_stat*s);
void pa_silence_memchunk(pa_memchunk *c, const pa_sample_spec *spec);
void pa_silence_memory(void *p, size_t length, const pa_sample_spec *spec);
diff --git a/src/polypcore/sink.c b/src/polypcore/sink.c
index 9bc478c3..1f374f5e 100644
--- a/src/polypcore/sink.c
+++ b/src/polypcore/sink.c
@@ -270,6 +270,8 @@ int pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result) {
result->memblock = pa_memblock_new(length, s->core->memblock_stat);
assert(result->memblock);
+/* pa_log("mixing %i\n", n); */
+
result->length = pa_mix(info, n, result->memblock->data, length, &s->sample_spec, &s->sw_volume);
result->index = 0;
}
diff --git a/src/polypcore/sink.h b/src/polypcore/sink.h
index 5fd9784f..fa120ebf 100644
--- a/src/polypcore/sink.h
+++ b/src/polypcore/sink.h
@@ -34,7 +34,7 @@ typedef struct pa_sink pa_sink;
#include <polypcore/source.h>
#include <polypcore/module.h>
-#define PA_MAX_INPUTS_PER_SINK 6
+#define PA_MAX_INPUTS_PER_SINK 32
typedef enum pa_sink_state {
PA_SINK_RUNNING,