diff options
author | Lennart Poettering <lennart@poettering.net> | 2007-07-25 14:46:40 +0000 |
---|---|---|
committer | Lennart Poettering <lennart@poettering.net> | 2007-07-25 14:46:40 +0000 |
commit | 068f5d5eef1cab3615f9899e0e458d59e54e95a2 (patch) | |
tree | b8ad9fd7fac81028220bf3cb561f7f8a85d2e994 | |
parent | 9cc20b46b7f8eba94f52a563e7781aff90274bef (diff) |
drop chunk argument from various drop() functions, since it doesn't make any sense if we want to guarantee always monotonously increasing read pointers; a couple of other fixes
git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1529 fefdeb5f-60dc-0310-8127-8f9354f1896f
-rw-r--r-- | src/modules/module-sine.c | 7 | ||||
-rw-r--r-- | src/modules/rtp/rtp.c | 2 | ||||
-rw-r--r-- | src/pulse/stream.c | 2 | ||||
-rw-r--r-- | src/pulsecore/memblock.c | 6 | ||||
-rw-r--r-- | src/pulsecore/memblockq.c | 146 | ||||
-rw-r--r-- | src/pulsecore/memblockq.h | 13 | ||||
-rw-r--r-- | src/pulsecore/play-memblockq.c | 20 | ||||
-rw-r--r-- | src/pulsecore/play-memchunk.c | 29 | ||||
-rw-r--r-- | src/pulsecore/protocol-simple.c | 13 | ||||
-rw-r--r-- | src/pulsecore/sink-input.c | 88 | ||||
-rw-r--r-- | src/pulsecore/sink-input.h | 4 | ||||
-rw-r--r-- | src/pulsecore/sink.c | 2 | ||||
-rw-r--r-- | src/pulsecore/sound-file-stream.c | 214 | ||||
-rw-r--r-- | src/tests/memblockq-test.c | 16 |
14 files changed, 343 insertions, 219 deletions
diff --git a/src/modules/module-sine.c b/src/modules/module-sine.c index b5b7e60b..a784f218 100644 --- a/src/modules/module-sine.c +++ b/src/modules/module-sine.c @@ -73,14 +73,13 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { return 0; } -static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) { +static void sink_input_drop_cb(pa_sink_input *i, size_t length) { struct userdata *u; size_t l; pa_assert(i); u = i->userdata; pa_assert(u); - pa_assert(chunk); pa_assert(length > 0); u->peek_index += length; @@ -93,8 +92,10 @@ static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_ static void sink_input_kill_cb(pa_sink_input *i) { struct userdata *u; - pa_assert(i && i->userdata); + + pa_assert(i); u = i->userdata; + pa_assert(u); pa_sink_input_disconnect(u->sink_input); pa_sink_input_unref(u->sink_input); diff --git a/src/modules/rtp/rtp.c b/src/modules/rtp/rtp.c index f0ab7d8a..31dec653 100644 --- a/src/modules/rtp/rtp.c +++ b/src/modules/rtp/rtp.c @@ -90,7 +90,7 @@ int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q) { } skip += k; - pa_memblockq_drop(q, &chunk, k); + pa_memblockq_drop(q, k); } if (r < 0 || !chunk.memblock || n >= size || iov_idx >= MAX_IOVECS) { diff --git a/src/pulse/stream.c b/src/pulse/stream.c index 44fce52f..f18a5dde 100644 --- a/src/pulse/stream.c +++ b/src/pulse/stream.c @@ -700,7 +700,7 @@ int pa_stream_drop(pa_stream *s) { PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE); - pa_memblockq_drop(s->record_memblockq, &s->peek_memchunk, s->peek_memchunk.length); + pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length); /* Fix the simulated local read index */ if (s->timing_info_valid && !s->timing_info.read_index_corrupt) diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c index 8da9cebb..c39147d1 100644 --- a/src/pulsecore/memblock.c +++ b/src/pulsecore/memblock.c @@ -150,7 +150,7 @@ struct pa_mempool { static void segment_detach(pa_memimport_segment *seg); -PA_STATIC_FLIST_DECLARE(unused_memblocks, 0); +PA_STATIC_FLIST_DECLARE(unused_memblocks, 0, pa_xfree); /* No lock necessary */ static void stat_add(pa_memblock*b) { @@ -670,8 +670,8 @@ void pa_mempool_free(pa_mempool *p) { pa_mutex_unlock(p->mutex); if (pa_atomic_load(&p->stat.n_allocated) > 0) { - raise(SIGTRAP); - pa_log_warn("WARNING! Memory pool destroyed but not all memory blocks freed!"); +/* raise(SIGTRAP); */ + pa_log_warn("WARNING! Memory pool destroyed but not all memory blocks freed! %u remain.", pa_atomic_load(&p->stat.n_allocated)); } pa_flist_free(p->free_slots, NULL); diff --git a/src/pulsecore/memblockq.c b/src/pulsecore/memblockq.c index 0c31166a..ecdf45b6 100644 --- a/src/pulsecore/memblockq.c +++ b/src/pulsecore/memblockq.c @@ -36,21 +36,24 @@ #include <pulsecore/log.h> #include <pulsecore/mcalign.h> #include <pulsecore/macro.h> +#include <pulsecore/flist.h> #include "memblockq.h" -struct memblock_list { - struct memblock_list *next, *prev; +struct list_item { + struct list_item *next, *prev; int64_t index; pa_memchunk chunk; }; +PA_STATIC_FLIST_DECLARE(list_items, 0, pa_xfree); + struct pa_memblockq { - struct memblock_list *blocks, *blocks_tail; + struct list_item *blocks, *blocks_tail; unsigned n_blocks; size_t maxlength, tlength, base, prebuf, minreq; int64_t read_index, write_index; - enum { PREBUF, RUNNING } state; + int in_prebuf; pa_memblock *silence; pa_mcalign *mcalign; }; @@ -77,13 +80,13 @@ pa_memblockq* pa_memblockq_new( bq->read_index = bq->write_index = idx; pa_log_debug("memblockq requested: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu", - (unsigned long)maxlength, (unsigned long)tlength, (unsigned long)base, (unsigned long)prebuf, (unsigned long)minreq); + (unsigned long) maxlength, (unsigned long) tlength, (unsigned long) base, (unsigned long) prebuf, (unsigned long) minreq); bq->maxlength = ((maxlength+base-1)/base)*base; pa_assert(bq->maxlength >= base); bq->tlength = ((tlength+base-1)/base)*base; - if (!bq->tlength || bq->tlength >= bq->maxlength) + if (bq->tlength <= 0 || bq->tlength > bq->maxlength) bq->tlength = bq->maxlength; bq->prebuf = (prebuf == (size_t) -1) ? bq->tlength/2 : prebuf; @@ -102,7 +105,7 @@ pa_memblockq* pa_memblockq_new( pa_log_debug("memblockq sanitized: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu", (unsigned long)bq->maxlength, (unsigned long)bq->tlength, (unsigned long)bq->base, (unsigned long)bq->prebuf, (unsigned long)bq->minreq); - bq->state = bq->prebuf ? PREBUF : RUNNING; + bq->in_prebuf = bq->prebuf > 0; bq->silence = silence ? pa_memblock_ref(silence) : NULL; bq->mcalign = NULL; @@ -113,7 +116,7 @@ void pa_memblockq_free(pa_memblockq* bq) { pa_assert(bq); pa_memblockq_flush(bq); - + if (bq->silence) pa_memblock_unref(bq->silence); @@ -123,7 +126,7 @@ void pa_memblockq_free(pa_memblockq* bq) { pa_xfree(bq); } -static void drop_block(pa_memblockq *bq, struct memblock_list *q) { +static void drop_block(pa_memblockq *bq, struct list_item *q) { pa_assert(bq); pa_assert(q); @@ -140,7 +143,9 @@ static void drop_block(pa_memblockq *bq, struct memblock_list *q) { bq->blocks_tail = q->prev; pa_memblock_unref(q->chunk.memblock); - pa_xfree(q); + + if (pa_flist_push(PA_STATIC_FLIST_GET(list_items), q) < 0) + pa_xfree(q); bq->n_blocks--; } @@ -171,7 +176,7 @@ static int can_push(pa_memblockq *bq, size_t l) { int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) { - struct memblock_list *q, *n; + struct list_item *q, *n; pa_memchunk chunk; pa_assert(bq); @@ -198,7 +203,7 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) { if (chunk.length > d) { chunk.index += d; chunk.length -= d; - bq->write_index = bq->read_index; + bq->write_index += d; } else { /* We drop the incoming data completely */ bq->write_index += chunk.length; @@ -212,10 +217,10 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) { q = bq->blocks_tail; while (q) { - if (bq->write_index >= q->index + (int64_t)q->chunk.length) + if (bq->write_index >= q->index + (int64_t) q->chunk.length) /* We found the entry where we need to place the new entry immediately after */ break; - else if (bq->write_index + (int64_t)chunk.length <= q->index) { + else if (bq->write_index + (int64_t) chunk.length <= q->index) { /* This entry isn't touched at all, let's skip it */ q = q->prev; } else if (bq->write_index <= q->index && @@ -223,7 +228,7 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) { /* This entry is fully replaced by the new entry, so let's drop it */ - struct memblock_list *p; + struct list_item *p; p = q; q = q->prev; drop_block(bq, p); @@ -234,11 +239,13 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) { if (bq->write_index + chunk.length < q->index + q->chunk.length) { /* We need to save the end of this memchunk */ - struct memblock_list *p; + struct list_item *p; size_t d; /* Create a new list entry for the end of thie memchunk */ - p = pa_xnew(struct memblock_list, 1); + if (!(p = pa_flist_pop(PA_STATIC_FLIST_GET(list_items)))) + p = pa_xnew(struct list_item, 1); + p->chunk = q->chunk; pa_memblock_ref(p->chunk.memblock); @@ -263,7 +270,7 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) { /* Truncate the chunk */ if (!(q->chunk.length = bq->write_index - q->index)) { - struct memblock_list *p; + struct list_item *p; p = q; q = q->prev; drop_block(bq, p); @@ -287,7 +294,6 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) { q = q->prev; } - } if (q) { @@ -308,7 +314,9 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) { pa_assert(!bq->blocks || (bq->write_index + (int64_t)chunk.length <= bq->blocks->index)); - n = pa_xnew(struct memblock_list, 1); + if (!(n = pa_flist_pop(PA_STATIC_FLIST_GET(list_items)))) + n = pa_xnew(struct list_item, 1); + n->chunk = chunk; pa_memblock_ref(n->chunk.memblock); n->index = bq->write_index; @@ -331,24 +339,34 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) { return 0; } -int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) { +static int memblockq_check_prebuf(pa_memblockq *bq) { pa_assert(bq); - pa_assert(chunk); + + if (bq->in_prebuf) { + + if (pa_memblockq_get_length(bq) < bq->prebuf) + return 1; - if (bq->state == PREBUF) { + bq->in_prebuf = 0; + return 0; + } else { - /* We need to pre-buffer */ - if (pa_memblockq_get_length(bq) < bq->prebuf) - return -1; + if (bq->prebuf > 0 && bq->read_index >= bq->write_index) { + bq->in_prebuf = 1; + return 1; + } - bq->state = RUNNING; + return 0; + } +} - } else if (bq->prebuf > 0 && bq->read_index >= bq->write_index) { +int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) { + pa_assert(bq); + pa_assert(chunk); - /* Buffer underflow protection */ - bq->state = PREBUF; + /* We need to pre-buffer */ + if (memblockq_check_prebuf(bq)) return -1; - } /* Do we need to spit out silence? */ if (!bq->blocks || bq->blocks->index > bq->read_index) { @@ -390,43 +408,16 @@ int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) { return 0; } -void pa_memblockq_drop(pa_memblockq *bq, const pa_memchunk *chunk, size_t length) { +void pa_memblockq_drop(pa_memblockq *bq, size_t length) { pa_assert(bq); pa_assert(length % bq->base == 0); - pa_assert(!chunk || length <= chunk->length); - - if (chunk) { - - if (bq->blocks && bq->blocks->index == bq->read_index) { - /* The first item in queue is valid */ - - /* Does the chunk match with what the user supplied us? */ - if (memcmp(chunk, &bq->blocks->chunk, sizeof(pa_memchunk)) != 0) - return; - - } else { - size_t l; - - /* The first item in the queue is not yet relevant */ - - pa_assert(!bq->blocks || bq->blocks->index > bq->read_index); - l = bq->blocks ? bq->blocks->index - bq->read_index : 0; - - if (bq->silence) { - - if (!l || l > pa_memblock_get_length(bq->silence)) - l = pa_memblock_get_length(bq->silence); - - } - - /* Do the entries still match? */ - if (chunk->index != 0 || chunk->length != l || chunk->memblock != bq->silence) - return; - } - } - + while (length > 0) { + /* Do not drop any data when we are in prebuffering mode */ + if (memblockq_check_prebuf(bq)) + break; + if (bq->blocks) { size_t d; @@ -476,15 +467,11 @@ void pa_memblockq_drop(pa_memblockq *bq, const pa_memchunk *chunk, size_t length int pa_memblockq_is_readable(pa_memblockq *bq) { pa_assert(bq); - if (bq->prebuf > 0) { - size_t l = pa_memblockq_get_length(bq); - - if (bq->state == PREBUF && l < bq->prebuf) - return 0; + if (memblockq_check_prebuf(bq)) + return 0; - if (l <= 0) - return 0; - } + if (pa_memblockq_get_length(bq) <= 0) + return 0; return 1; } @@ -506,7 +493,7 @@ size_t pa_memblockq_missing(pa_memblockq *bq) { return 0; l = bq->tlength - l; - return (l >= bq->minreq) ? l : 0; + return l >= bq->minreq ? l : 0; } size_t pa_memblockq_get_minreq(pa_memblockq *bq) { @@ -529,7 +516,7 @@ void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek) { bq->write_index = bq->read_index + offset; return; case PA_SEEK_RELATIVE_END: - bq->write_index = (bq->blocks_tail ? bq->blocks_tail->index + (int64_t)bq->blocks_tail->chunk.length : bq->read_index) + offset; + bq->write_index = (bq->blocks_tail ? bq->blocks_tail->index + (int64_t) bq->blocks_tail->chunk.length : bq->read_index) + offset; return; } @@ -569,7 +556,7 @@ int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk) { pa_memchunk rchunk; pa_assert(bq); - pa_assert(chunk && bq->base); + pa_assert(chunk); if (bq->base == 1) return pa_memblockq_push(bq, chunk); @@ -601,21 +588,20 @@ void pa_memblockq_shorten(pa_memblockq *bq, size_t length) { l = pa_memblockq_get_length(bq); if (l > length) - pa_memblockq_drop(bq, NULL, l - length); + pa_memblockq_drop(bq, l - length); } void pa_memblockq_prebuf_disable(pa_memblockq *bq) { pa_assert(bq); - if (bq->state == PREBUF) - bq->state = RUNNING; + bq->in_prebuf = 0; } void pa_memblockq_prebuf_force(pa_memblockq *bq) { pa_assert(bq); - if (bq->state == RUNNING && bq->prebuf > 0) - bq->state = PREBUF; + if (!bq->in_prebuf && bq->prebuf > 0) + bq->in_prebuf = 1; } size_t pa_memblockq_get_maxlength(pa_memblockq *bq) { diff --git a/src/pulsecore/memblockq.h b/src/pulsecore/memblockq.h index e8243568..5eb23aac 100644 --- a/src/pulsecore/memblockq.h +++ b/src/pulsecore/memblockq.h @@ -83,13 +83,16 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *chunk); * you know what you do. */ int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk); -/* Return a copy of the next memory chunk in the queue. It is not removed from the queue */ +/* Return a copy of the next memory chunk in the queue. It is not + * removed from the queue. There are two reasons this function might + * fail: 1. prebuffering is active, 2. queue is empty and no silence + * memblock was passed at initialization. If the queue is not empty, + * but we're currently at a hole in the queue and no silence memblock + * was passed we return the length of the hole in chunk->length. */ int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk); -/* Drop the specified bytes from the queue, but only if the first - * chunk in the queue matches the one passed here. If NULL is passed, - * this check isn't done. */ -void pa_memblockq_drop(pa_memblockq *bq, const pa_memchunk *chunk, size_t length); +/* Drop the specified bytes from the queue. */ +void pa_memblockq_drop(pa_memblockq *bq, size_t length); /* Test if the pa_memblockq is currently readable, that is, more data than base */ int pa_memblockq_is_readable(pa_memblockq *bq); diff --git a/src/pulsecore/play-memblockq.c b/src/pulsecore/play-memblockq.c index 9c5945af..51ea22e8 100644 --- a/src/pulsecore/play-memblockq.c +++ b/src/pulsecore/play-memblockq.c @@ -37,7 +37,7 @@ #include "play-memblockq.h" -static void sink_input_kill(pa_sink_input *i) { +static void sink_input_kill_cb(pa_sink_input *i) { pa_memblockq *q; assert(i); assert(i->userdata); @@ -50,7 +50,7 @@ static void sink_input_kill(pa_sink_input *i) { pa_memblockq_free(q); } -static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) { +static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { pa_memblockq *q; assert(i); assert(chunk); @@ -61,11 +61,11 @@ static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) { return pa_memblockq_peek(q, chunk); } -static void si_kill(PA_GCC_UNUSED pa_mainloop_api *m, void *i) { - sink_input_kill(i); +static void si_kill_cb(PA_GCC_UNUSED pa_mainloop_api *m, void *i) { + sink_input_kill_cb(i); } -static void sink_input_drop(pa_sink_input *i, const pa_memchunk*chunk, size_t length) { +static void sink_input_drop_cb(pa_sink_input *i, size_t length) { pa_memblockq *q; assert(i); @@ -74,10 +74,10 @@ static void sink_input_drop(pa_sink_input *i, const pa_memchunk*chunk, size_t le q = i->userdata; - pa_memblockq_drop(q, chunk, length); + pa_memblockq_drop(q, length); if (pa_memblockq_get_length(q) <= 0) - pa_mainloop_api_once(i->sink->core->mainloop, si_kill, i); + pa_mainloop_api_once(i->sink->core->mainloop, si_kill_cb, i); } int pa_play_memblockq( @@ -116,9 +116,9 @@ int pa_play_memblockq( if (!(si = pa_sink_input_new(sink->core, &data, 0))) return -1; - si->peek = sink_input_peek; - si->drop = sink_input_drop; - si->kill = sink_input_kill; + si->peek = sink_input_peek_cb; + si->drop = sink_input_drop_cb; + si->kill = sink_input_kill_cb; si->userdata = q; diff --git a/src/pulsecore/play-memchunk.c b/src/pulsecore/play-memchunk.c index 65b6e825..7e750baa 100644 --- a/src/pulsecore/play-memchunk.c +++ b/src/pulsecore/play-memchunk.c @@ -37,7 +37,7 @@ #include "play-memchunk.h" -static void sink_input_kill(pa_sink_input *i) { +static void sink_input_kill_cb(pa_sink_input *i) { pa_memchunk *c; assert(i && i->userdata); c = i->userdata; @@ -49,7 +49,7 @@ static void sink_input_kill(pa_sink_input *i) { pa_xfree(c); } -static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) { +static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { pa_memchunk *c; assert(i && chunk && i->userdata); c = i->userdata; @@ -64,23 +64,24 @@ static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) { return 0; } -static void si_kill(PA_GCC_UNUSED pa_mainloop_api *m, void *i) { - sink_input_kill(i); +static void si_kill_cb(PA_GCC_UNUSED pa_mainloop_api *m, void *i) { + sink_input_kill_cb(i); } -static void sink_input_drop(pa_sink_input *i, const pa_memchunk*chunk, size_t length) { +static void sink_input_drop_cb(pa_sink_input *i, size_t length) { pa_memchunk *c; assert(i && length && i->userdata); c = i->userdata; - assert(!memcmp(chunk, c, sizeof(chunk))); - assert(length <= c->length); + if (length >= c->length) { + c->length -= length; + c->index += length; + } else { - c->length -= length; - c->index += length; + c->length = 0; - if (c->length <= 0) - pa_mainloop_api_once(i->sink->core->mainloop, si_kill, i); + pa_mainloop_api_once(i->sink->core->mainloop, si_kill_cb, i); + } } int pa_play_memchunk( @@ -113,9 +114,9 @@ int pa_play_memchunk( if (!(si = pa_sink_input_new(sink->core, &data, 0))) return -1; - si->peek = sink_input_peek; - si->drop = sink_input_drop; - si->kill = sink_input_kill; + si->peek = sink_input_peek_cb; + si->drop = sink_input_drop_cb; + si->kill = sink_input_kill_cb; si->userdata = nchunk = pa_xnew(pa_memchunk, 1); *nchunk = *chunk; diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index c423487a..8f9aed58 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -67,7 +67,6 @@ typedef struct connection { PA_DECLARE_CLASS(connection); #define CONNECTION(o) (connection_cast(o)) - static PA_DEFINE_CHECK_TYPE(connection, connection_check_type, pa_msgobject_check_type); struct pa_protocol_simple { @@ -230,7 +229,7 @@ static int do_write(connection *c) { return -1; } - pa_memblockq_drop(c->output_memblockq, &chunk, r); + pa_memblockq_drop(c->output_memblockq, r); return 0; } @@ -271,7 +270,6 @@ fail: static int connection_process_msg(pa_msgobject *o, int code, void*userdata, pa_memchunk *chunk) { connection *c = CONNECTION(o); - connection_assert_ref(c); switch (code) { @@ -351,13 +349,13 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { /* pa_log("peeked %u %i", r >= 0 ? chunk->length: 0, r); */ if (c->dead && r < 0) - pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_DROP_CONNECTION, c, NULL, NULL); + pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_DROP_CONNECTION, NULL, NULL, NULL); return r; } /* Called from thread context */ -static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) { +static void sink_input_drop_cb(pa_sink_input *i, size_t length) { connection*c = i->userdata; size_t old, new; @@ -366,7 +364,7 @@ static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_ pa_assert(length); old = pa_memblockq_missing(c->input_memblockq); - pa_memblockq_drop(c->input_memblockq, chunk, length); + pa_memblockq_drop(c->input_memblockq, length); new = pa_memblockq_missing(c->input_memblockq); if (new > old) { @@ -378,9 +376,8 @@ static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_ /* Called from main context */ static void sink_input_kill_cb(pa_sink_input *i) { pa_assert(i); - pa_assert(i->userdata); - connection_drop((connection *) i->userdata); + connection_drop(CONNECTION(i->userdata)); } /*** source_output callbacks ***/ diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c index d27f00f0..db98dd54 100644 --- a/src/pulsecore/sink-input.c +++ b/src/pulsecore/sink-input.c @@ -341,7 +341,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume) /* } */ if (!i->thread_info.resampler) { - do_volume_adj_here = 0; + do_volume_adj_here = 0; /* FIXME??? */ ret = i->peek(i, chunk); goto finish; } @@ -356,15 +356,14 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume) if ((ret = i->peek(i, &tchunk)) < 0) goto finish; - pa_assert(tchunk.length); + pa_assert(tchunk.length > 0); l = pa_resampler_request(i->thread_info.resampler, CONVERT_BUFFER_LENGTH); - if (l > tchunk.length) - l = tchunk.length; + if (tchunk.length > l) + tchunk.length = l; - i->drop(i, &tchunk, l); - tchunk.length = l; + i->drop(i, tchunk.length); /* It might be necessary to adjust the volume here */ if (do_volume_adj_here && !volume_is_norm) { @@ -377,7 +376,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume) } pa_assert(i->thread_info.resampled_chunk.memblock); - pa_assert(i->thread_info.resampled_chunk.length); + pa_assert(i->thread_info.resampled_chunk.length > 0); *chunk = i->thread_info.resampled_chunk; pa_memblock_ref(i->thread_info.resampled_chunk.memblock); @@ -409,7 +408,7 @@ finish: return ret; } -void pa_sink_input_drop(pa_sink_input *i, const pa_memchunk *chunk, size_t length) { +void pa_sink_input_drop(pa_sink_input *i, size_t length) { pa_sink_input_assert_ref(i); pa_assert(length > 0); @@ -440,22 +439,67 @@ void pa_sink_input_drop(pa_sink_input *i, const pa_memchunk *chunk, size_t lengt /* return; */ /* } */ - if (!i->thread_info.resampler) { - if (i->drop) - i->drop(i, chunk, length); - return; - } - - pa_assert(i->thread_info.resampled_chunk.memblock); - pa_assert(i->thread_info.resampled_chunk.length >= length); + pa_log("dropping %u", length); + + if (i->thread_info.resampled_chunk.memblock) { + size_t l = length; + + if (l > i->thread_info.resampled_chunk.length) + l = i->thread_info.resampled_chunk.length; + + pa_log("really dropping %u", l); + + i->thread_info.resampled_chunk.index += l; + i->thread_info.resampled_chunk.length -= l; + + if (i->thread_info.resampled_chunk.length <= 0) { + pa_memblock_unref(i->thread_info.resampled_chunk.memblock); + pa_memchunk_reset(&i->thread_info.resampled_chunk); + } - i->thread_info.resampled_chunk.index += length; - i->thread_info.resampled_chunk.length -= length; + length -= l; + } - if (i->thread_info.resampled_chunk.length <= 0) { - pa_memblock_unref(i->thread_info.resampled_chunk.memblock); - i->thread_info.resampled_chunk.memblock = NULL; - i->thread_info.resampled_chunk.index = i->thread_info.resampled_chunk.length = 0; + pa_log("really remaining %u", length); + + if (length > 0) { + + if (i->thread_info.resampler) { + /* So, we have a resampler. To avoid discontinuities we + * have to actually read all data that could be read and + * pass it through the resampler. */ + + while (length > 0) { + pa_memchunk chunk; + pa_cvolume volume; + + if (pa_sink_input_peek(i, &chunk, &volume) >= 0) { + size_t l = chunk.length; + + if (l > length) + l = length; + + pa_sink_input_drop(i, l); + length -= l; + + } else { + /* Hmmm, peeking failed, so let's at least drop + * the right amount of data */ + + if (i->drop) + i->drop(i, pa_resampler_request(i->thread_info.resampler, length)); + + break; + } + } + + } else { + + /* We have no resampler, hence let's just drop the data */ + + if (i->drop) + i->drop(i, length); + } } } diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h index 426e48c0..fe62917a 100644 --- a/src/pulsecore/sink-input.h +++ b/src/pulsecore/sink-input.h @@ -75,7 +75,7 @@ struct pa_sink_input { int muted; int (*peek) (pa_sink_input *i, pa_memchunk *chunk); - void (*drop) (pa_sink_input *i, const pa_memchunk *chunk, size_t length); + void (*drop) (pa_sink_input *i, size_t length); void (*kill) (pa_sink_input *i); /* may be NULL */ pa_usec_t (*get_latency) (pa_sink_input *i); /* may be NULL */ void (*underrun) (pa_sink_input *i); /* may be NULL */ @@ -178,7 +178,7 @@ pa_sink_input_state_t pa_sink_input_get_state(pa_sink_input *i); /* To be used exclusively by the sink driver thread */ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume); -void pa_sink_input_drop(pa_sink_input *i, const pa_memchunk *chunk, size_t length); +void pa_sink_input_drop(pa_sink_input *i, size_t length); int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk); #endif diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c index 5a79a41c..a66097bc 100644 --- a/src/pulsecore/sink.c +++ b/src/pulsecore/sink.c @@ -326,7 +326,7 @@ static void inputs_drop(pa_sink *s, pa_mix_info *info, unsigned n, size_t length } /* Drop read data */ - pa_sink_input_drop(i, m ? &m->chunk : NULL, length); + pa_sink_input_drop(i, length); if (m) { pa_sink_input_unref(m->userdata); diff --git a/src/pulsecore/sound-file-stream.c b/src/pulsecore/sound-file-stream.c index 974c053a..c31187c5 100644 --- a/src/pulsecore/sound-file-stream.c +++ b/src/pulsecore/sound-file-stream.c @@ -26,7 +26,6 @@ #endif #include <stdlib.h> -#include <assert.h> #include <stdio.h> #include <string.h> @@ -41,89 +40,177 @@ #define BUF_SIZE (1024*10) -struct userdata { +typedef struct file_stream { + pa_msgobject parent; + pa_core *core; SNDFILE *sndfile; pa_sink_input *sink_input; pa_memchunk memchunk; sf_count_t (*readf_function)(SNDFILE *sndfile, void *ptr, sf_count_t frames); + size_t drop; +} file_stream; + +enum { + MESSAGE_DROP_FILE_STREAM }; -static void free_userdata(struct userdata *u) { - assert(u); - if (u->sink_input) { - pa_sink_input_disconnect(u->sink_input); - pa_sink_input_unref(u->sink_input); - } +PA_DECLARE_CLASS(file_stream); +#define FILE_STREAM(o) (file_stream_cast(o)) +static PA_DEFINE_CHECK_TYPE(file_stream, file_stream_check_type, pa_msgobject_check_type); + +static void file_stream_free(pa_object *o) { + file_stream *u = FILE_STREAM(o); + pa_assert(u); + pa_log("xxxx ffreee"); + if (u->memchunk.memblock) pa_memblock_unref(u->memchunk.memblock); + if (u->sndfile) sf_close(u->sndfile); pa_xfree(u); } -static void sink_input_kill(pa_sink_input *i) { - assert(i && i->userdata); - free_userdata(i->userdata); -} - -static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) { - struct userdata *u; - assert(i && chunk && i->userdata); - u = i->userdata; +static void file_stream_drop(file_stream *u) { + file_stream_assert_ref(u); - if (!u->memchunk.memblock) { - uint32_t fs = pa_frame_size(&i->sample_spec); - sf_count_t n; - void *p; + pa_log("xxxx drop"); + + + if (u->sink_input) { + pa_sink_input_disconnect(u->sink_input); + pa_sink_input_unref(u->sink_input); + u->sink_input = NULL; - u->memchunk.memblock = pa_memblock_new(i->sink->core->mempool, BUF_SIZE); - u->memchunk.index = 0; + /* Make sure we don't decrease the ref count twice. */ + file_stream_unref(u); + } +} - p = pa_memblock_acquire(u->memchunk.memblock); +static int file_stream_process_msg(pa_msgobject *o, int code, void*userdata, pa_memchunk *chunk) { + file_stream *u = FILE_STREAM(o); + file_stream_assert_ref(u); + + switch (code) { + case MESSAGE_DROP_FILE_STREAM: + file_stream_drop(u); + break; + } - if (u->readf_function) { - if ((n = u->readf_function(u->sndfile, p, BUF_SIZE/fs)) <= 0) - n = 0; + return 0; +} - u->memchunk.length = n * fs; - } else { - if ((n = sf_read_raw(u->sndfile, p, BUF_SIZE)) <= 0) - n = 0; +static void sink_input_kill_cb(pa_sink_input *i) { + pa_assert(i); + + file_stream_drop(FILE_STREAM(i->userdata)); +} - u->memchunk.length = n; +static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { + file_stream *u; + + pa_assert(i); + pa_assert(chunk); + u = FILE_STREAM(i->userdata); + file_stream_assert_ref(u); + + for (;;) { + + if (!u->memchunk.memblock) { + + u->memchunk.memblock = pa_memblock_new(i->sink->core->mempool, BUF_SIZE); + u->memchunk.index = 0; + + if (u->readf_function) { + sf_count_t n; + void *p; + size_t fs = pa_frame_size(&i->sample_spec); + + p = pa_memblock_acquire(u->memchunk.memblock); + n = u->readf_function(u->sndfile, p, BUF_SIZE/fs); + pa_memblock_release(u->memchunk.memblock); + + pa_log("%u/%u = data: %02x %02x %02x %02x %02x %02x %02x %02x", + (unsigned int) n, BUF_SIZE/fs, + ((uint8_t*)p)[0], ((uint8_t*)p)[1], ((uint8_t*)p)[2], ((uint8_t*)p)[3], + ((uint8_t*)p)[4], ((uint8_t*)p)[5], ((uint8_t*)p)[6], ((uint8_t*)p)[7]); + + if (n <= 0) + n = 0; + + u->memchunk.length = n * fs; + } else { + sf_count_t n; + void *p; + + p = pa_memblock_acquire(u->memchunk.memblock); + n = sf_read_raw(u->sndfile, p, BUF_SIZE); + pa_memblock_release(u->memchunk.memblock); + + if (n <= 0) + n = 0; + + u->memchunk.length = n; + } + + if (u->memchunk.length <= 0) { + + pa_memblock_unref(u->memchunk.memblock); + pa_memchunk_reset(&u->memchunk); + + pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u), MESSAGE_DROP_FILE_STREAM, NULL, NULL, NULL); + return -1; + } } - pa_memblock_release(u->memchunk.memblock); - if (!u->memchunk.length) { - free_userdata(u); - return -1; + pa_assert(u->memchunk.memblock); + pa_assert(u->memchunk.length > 0); + + if (u->drop < u->memchunk.length) { + u->memchunk.index += u->drop; + u->memchunk.length -= u->drop; + u->drop = 0; + break; } + + u->drop -= u->memchunk.length; + pa_memblock_unref(u->memchunk.memblock); + pa_memchunk_reset(&u->memchunk); } *chunk = u->memchunk; pa_memblock_ref(chunk->memblock); - assert(chunk->length); + + pa_assert(chunk->length > 0); + pa_assert(u->drop <= 0); + return 0; } -static void sink_input_drop(pa_sink_input *i, const pa_memchunk*chunk, size_t length) { - struct userdata *u; - assert(i && chunk && length && i->userdata); - u = i->userdata; +static void sink_input_drop_cb(pa_sink_input *i, size_t length) { + file_stream *u; - assert(!memcmp(chunk, &u->memchunk, sizeof(chunk))); - assert(length <= u->memchunk.length); + pa_assert(i); + pa_assert(length > 0); + u = FILE_STREAM(i->userdata); + file_stream_assert_ref(u); + + if (u->memchunk.memblock) { - u->memchunk.index += length; - u->memchunk.length -= length; + if (length < u->memchunk.length) { + u->memchunk.index += length; + u->memchunk.length -= length; + return; + } - if (u->memchunk.length <= 0) { + length -= u->memchunk.length; pa_memblock_unref(u->memchunk.memblock); - u->memchunk.memblock = NULL; - u->memchunk.index = u->memchunk.length = 0; + pa_memchunk_reset(&u->memchunk); } + + u->drop += length; } int pa_play_file( @@ -131,19 +218,23 @@ int pa_play_file( const char *fname, const pa_cvolume *volume) { - struct userdata *u = NULL; + file_stream *u = NULL; SF_INFO sfinfo; pa_sample_spec ss; pa_sink_input_new_data data; - assert(sink); - assert(fname); + pa_assert(sink); + pa_assert(fname); - u = pa_xnew(struct userdata, 1); + u = pa_msgobject_new(file_stream, file_stream_check_type); + u->parent.parent.free = file_stream_free; + u->parent.process_msg = file_stream_process_msg; + u->core = sink->core; u->sink_input = NULL; - u->memchunk.memblock = NULL; - u->memchunk.index = u->memchunk.length = 0; + pa_memchunk_reset(&u->memchunk); u->sndfile = NULL; + u->readf_function = NULL; + u->drop = 0; memset(&sfinfo, 0, sizeof(sfinfo)); @@ -152,8 +243,6 @@ int pa_play_file( goto fail; } - u->readf_function = NULL; - switch (sfinfo.format & 0xFF) { case SF_FORMAT_PCM_16: case SF_FORMAT_PCM_U8: @@ -195,18 +284,21 @@ int pa_play_file( if (!(u->sink_input = pa_sink_input_new(sink->core, &data, 0))) goto fail; - u->sink_input->peek = sink_input_peek; - u->sink_input->drop = sink_input_drop; - u->sink_input->kill = sink_input_kill; + u->sink_input->peek = sink_input_peek_cb; + u->sink_input->drop = sink_input_drop_cb; + u->sink_input->kill = sink_input_kill_cb; u->sink_input->userdata = u; -/* pa_sink_notify(u->sink_input->sink); */ + pa_sink_input_put(u->sink_input); + + /* The reference to u is dangling here, because we want to keep + * this stream around until it is fully played. */ return 0; fail: if (u) - free_userdata(u); + file_stream_unref(u); return -1; } diff --git a/src/tests/memblockq-test.c b/src/tests/memblockq-test.c index 7ad3b2f3..25ea399b 100644 --- a/src/tests/memblockq-test.c +++ b/src/tests/memblockq-test.c @@ -26,6 +26,7 @@ #include <stdlib.h> #include <assert.h> #include <stdio.h> +#include <signal.h> #include <pulsecore/memblockq.h> #include <pulsecore/log.h> @@ -48,22 +49,22 @@ int main(int argc, char *argv[]) { bq = pa_memblockq_new(0, 40, 10, 2, 4, 4, silence); assert(bq); - chunk1.memblock = pa_memblock_new_fixed(p, (char*) "AA", 2, 1); + chunk1.memblock = pa_memblock_new_fixed(p, (char*) "11", 2, 1); chunk1.index = 0; chunk1.length = 2; assert(chunk1.memblock); - chunk2.memblock = pa_memblock_new_fixed(p, (char*) "TTBB", 4, 1); + chunk2.memblock = pa_memblock_new_fixed(p, (char*) "XX22", 4, 1); chunk2.index = 2; chunk2.length = 2; assert(chunk2.memblock); - chunk3.memblock = pa_memblock_new_fixed(p, (char*) "ZZZZ", 4, 1); + chunk3.memblock = pa_memblock_new_fixed(p, (char*) "3333", 4, 1); chunk3.index = 0; chunk3.length = 4; assert(chunk3.memblock); - chunk4.memblock = pa_memblock_new_fixed(p, (char*) "KKKKKKKK", 8, 1); + chunk4.memblock = pa_memblock_new_fixed(p, (char*) "44444444", 8, 1); chunk4.index = 0; chunk4.length = 8; assert(chunk4.memblock); @@ -115,13 +116,12 @@ int main(int argc, char *argv[]) { chunk3.index += 2; chunk3.length -= 2; - ret = pa_memblockq_push(bq, &chunk3); assert(ret == 0); - printf(">"); + pa_memblockq_shorten(bq, pa_memblockq_get_length(bq)-2); - pa_memblockq_shorten(bq, 6); + printf(">"); for (;;) { pa_memchunk out; @@ -137,7 +137,7 @@ int main(int argc, char *argv[]) { pa_memblock_release(out.memblock); pa_memblock_unref(out.memblock); - pa_memblockq_drop(bq, &out, out.length); + pa_memblockq_drop(bq, out.length); } printf("<\n"); |