diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/modules/module-sine.c | 7 | ||||
| -rw-r--r-- | src/modules/rtp/rtp.c | 2 | ||||
| -rw-r--r-- | src/pulse/stream.c | 2 | ||||
| -rw-r--r-- | src/pulsecore/memblock.c | 6 | ||||
| -rw-r--r-- | src/pulsecore/memblockq.c | 146 | ||||
| -rw-r--r-- | src/pulsecore/memblockq.h | 13 | ||||
| -rw-r--r-- | src/pulsecore/play-memblockq.c | 20 | ||||
| -rw-r--r-- | src/pulsecore/play-memchunk.c | 29 | ||||
| -rw-r--r-- | src/pulsecore/protocol-simple.c | 13 | ||||
| -rw-r--r-- | src/pulsecore/sink-input.c | 88 | ||||
| -rw-r--r-- | src/pulsecore/sink-input.h | 4 | ||||
| -rw-r--r-- | src/pulsecore/sink.c | 2 | ||||
| -rw-r--r-- | src/pulsecore/sound-file-stream.c | 214 | ||||
| -rw-r--r-- | src/tests/memblockq-test.c | 16 | 
14 files changed, 343 insertions, 219 deletions
| diff --git a/src/modules/module-sine.c b/src/modules/module-sine.c index b5b7e60b..a784f218 100644 --- a/src/modules/module-sine.c +++ b/src/modules/module-sine.c @@ -73,14 +73,13 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {      return 0;  } -static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) { +static void sink_input_drop_cb(pa_sink_input *i, size_t length) {      struct userdata *u;      size_t l;      pa_assert(i);      u = i->userdata;      pa_assert(u); -    pa_assert(chunk);      pa_assert(length > 0);      u->peek_index += length; @@ -93,8 +92,10 @@ static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_  static void sink_input_kill_cb(pa_sink_input *i) {      struct userdata *u; -    pa_assert(i && i->userdata); +     +    pa_assert(i);      u = i->userdata; +    pa_assert(u);      pa_sink_input_disconnect(u->sink_input);      pa_sink_input_unref(u->sink_input); diff --git a/src/modules/rtp/rtp.c b/src/modules/rtp/rtp.c index f0ab7d8a..31dec653 100644 --- a/src/modules/rtp/rtp.c +++ b/src/modules/rtp/rtp.c @@ -90,7 +90,7 @@ int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q) {              }              skip += k; -            pa_memblockq_drop(q, &chunk, k); +            pa_memblockq_drop(q, k);          }          if (r < 0 || !chunk.memblock || n >= size || iov_idx >= MAX_IOVECS) { diff --git a/src/pulse/stream.c b/src/pulse/stream.c index 44fce52f..f18a5dde 100644 --- a/src/pulse/stream.c +++ b/src/pulse/stream.c @@ -700,7 +700,7 @@ int pa_stream_drop(pa_stream *s) {      PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);      PA_CHECK_VALIDITY(s->context, s->peek_memchunk.memblock, PA_ERR_BADSTATE); -    pa_memblockq_drop(s->record_memblockq, &s->peek_memchunk, s->peek_memchunk.length); +    pa_memblockq_drop(s->record_memblockq, s->peek_memchunk.length);      /* Fix the simulated local read index */      if (s->timing_info_valid && !s->timing_info.read_index_corrupt) diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c index 8da9cebb..c39147d1 100644 --- a/src/pulsecore/memblock.c +++ b/src/pulsecore/memblock.c @@ -150,7 +150,7 @@ struct pa_mempool {  static void segment_detach(pa_memimport_segment *seg); -PA_STATIC_FLIST_DECLARE(unused_memblocks, 0); +PA_STATIC_FLIST_DECLARE(unused_memblocks, 0, pa_xfree);  /* No lock necessary */  static void stat_add(pa_memblock*b) { @@ -670,8 +670,8 @@ void pa_mempool_free(pa_mempool *p) {      pa_mutex_unlock(p->mutex);      if (pa_atomic_load(&p->stat.n_allocated) > 0) { -        raise(SIGTRAP); -        pa_log_warn("WARNING! Memory pool destroyed but not all memory blocks freed!"); +/*         raise(SIGTRAP);  */ +        pa_log_warn("WARNING! Memory pool destroyed but not all memory blocks freed! %u remain.", pa_atomic_load(&p->stat.n_allocated));      }      pa_flist_free(p->free_slots, NULL); diff --git a/src/pulsecore/memblockq.c b/src/pulsecore/memblockq.c index 0c31166a..ecdf45b6 100644 --- a/src/pulsecore/memblockq.c +++ b/src/pulsecore/memblockq.c @@ -36,21 +36,24 @@  #include <pulsecore/log.h>  #include <pulsecore/mcalign.h>  #include <pulsecore/macro.h> +#include <pulsecore/flist.h>  #include "memblockq.h" -struct memblock_list { -    struct memblock_list *next, *prev; +struct list_item { +    struct list_item *next, *prev;      int64_t index;      pa_memchunk chunk;  }; +PA_STATIC_FLIST_DECLARE(list_items, 0, pa_xfree); +          struct pa_memblockq { -    struct memblock_list *blocks, *blocks_tail; +    struct list_item *blocks, *blocks_tail;      unsigned n_blocks;      size_t maxlength, tlength, base, prebuf, minreq;      int64_t read_index, write_index; -    enum { PREBUF, RUNNING } state; +    int in_prebuf;      pa_memblock *silence;      pa_mcalign *mcalign;  }; @@ -77,13 +80,13 @@ pa_memblockq* pa_memblockq_new(      bq->read_index = bq->write_index = idx;      pa_log_debug("memblockq requested: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu", -        (unsigned long)maxlength, (unsigned long)tlength, (unsigned long)base, (unsigned long)prebuf, (unsigned long)minreq); +        (unsigned long) maxlength, (unsigned long) tlength, (unsigned long) base, (unsigned long) prebuf, (unsigned long) minreq);      bq->maxlength = ((maxlength+base-1)/base)*base;      pa_assert(bq->maxlength >= base);      bq->tlength = ((tlength+base-1)/base)*base; -    if (!bq->tlength || bq->tlength >= bq->maxlength) +    if (bq->tlength <= 0 || bq->tlength > bq->maxlength)          bq->tlength = bq->maxlength;      bq->prebuf = (prebuf == (size_t) -1) ? bq->tlength/2 : prebuf; @@ -102,7 +105,7 @@ pa_memblockq* pa_memblockq_new(      pa_log_debug("memblockq sanitized: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu",          (unsigned long)bq->maxlength, (unsigned long)bq->tlength, (unsigned long)bq->base, (unsigned long)bq->prebuf, (unsigned long)bq->minreq); -    bq->state = bq->prebuf ? PREBUF : RUNNING; +    bq->in_prebuf = bq->prebuf > 0;      bq->silence = silence ? pa_memblock_ref(silence) : NULL;      bq->mcalign = NULL; @@ -113,7 +116,7 @@ void pa_memblockq_free(pa_memblockq* bq) {      pa_assert(bq);      pa_memblockq_flush(bq); - +          if (bq->silence)          pa_memblock_unref(bq->silence); @@ -123,7 +126,7 @@ void pa_memblockq_free(pa_memblockq* bq) {      pa_xfree(bq);  } -static void drop_block(pa_memblockq *bq, struct memblock_list *q) { +static void drop_block(pa_memblockq *bq, struct list_item *q) {      pa_assert(bq);      pa_assert(q); @@ -140,7 +143,9 @@ static void drop_block(pa_memblockq *bq, struct memblock_list *q) {          bq->blocks_tail = q->prev;      pa_memblock_unref(q->chunk.memblock); -    pa_xfree(q); + +    if (pa_flist_push(PA_STATIC_FLIST_GET(list_items), q) < 0) +        pa_xfree(q);      bq->n_blocks--;  } @@ -171,7 +176,7 @@ static int can_push(pa_memblockq *bq, size_t l) {  int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) { -    struct memblock_list *q, *n; +    struct list_item *q, *n;      pa_memchunk chunk;      pa_assert(bq); @@ -198,7 +203,7 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) {          if (chunk.length > d) {              chunk.index += d;              chunk.length -= d; -            bq->write_index = bq->read_index; +            bq->write_index += d;          } else {              /* We drop the incoming data completely */              bq->write_index += chunk.length; @@ -212,10 +217,10 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) {      q = bq->blocks_tail;      while (q) { -        if (bq->write_index >= q->index + (int64_t)q->chunk.length) +        if (bq->write_index >= q->index + (int64_t) q->chunk.length)              /* We found the entry where we need to place the new entry immediately after */              break; -        else if (bq->write_index + (int64_t)chunk.length <= q->index) { +        else if (bq->write_index + (int64_t) chunk.length <= q->index) {              /* This entry isn't touched at all, let's skip it */              q = q->prev;          } else if (bq->write_index <= q->index && @@ -223,7 +228,7 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) {              /* This entry is fully replaced by the new entry, so let's drop it */ -            struct memblock_list *p; +            struct list_item *p;              p = q;              q = q->prev;              drop_block(bq, p); @@ -234,11 +239,13 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) {              if (bq->write_index + chunk.length < q->index + q->chunk.length) {                  /* We need to save the end of this memchunk */ -                struct memblock_list *p; +                struct list_item *p;                  size_t d;                  /* Create a new list entry for the end of thie memchunk */ -                p = pa_xnew(struct memblock_list, 1); +                if (!(p = pa_flist_pop(PA_STATIC_FLIST_GET(list_items)))) +                    p = pa_xnew(struct list_item, 1); +                                  p->chunk = q->chunk;                  pa_memblock_ref(p->chunk.memblock); @@ -263,7 +270,7 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) {              /* Truncate the chunk */              if (!(q->chunk.length = bq->write_index - q->index)) { -                struct memblock_list *p; +                struct list_item *p;                  p = q;                  q = q->prev;                  drop_block(bq, p); @@ -287,7 +294,6 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) {              q = q->prev;          } -      }      if (q) { @@ -308,7 +314,9 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) {          pa_assert(!bq->blocks || (bq->write_index + (int64_t)chunk.length <= bq->blocks->index)); -    n = pa_xnew(struct memblock_list, 1); +    if (!(n = pa_flist_pop(PA_STATIC_FLIST_GET(list_items)))) +        n = pa_xnew(struct list_item, 1); +          n->chunk = chunk;      pa_memblock_ref(n->chunk.memblock);      n->index = bq->write_index; @@ -331,24 +339,34 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) {      return 0;  } -int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) { +static int memblockq_check_prebuf(pa_memblockq *bq) {      pa_assert(bq); -    pa_assert(chunk); +     +    if (bq->in_prebuf) { +         +        if (pa_memblockq_get_length(bq) < bq->prebuf) +            return 1; -    if (bq->state == PREBUF) { +        bq->in_prebuf = 0; +        return 0; +    } else { -        /* We need to pre-buffer */ -        if (pa_memblockq_get_length(bq) < bq->prebuf) -            return -1; +        if (bq->prebuf > 0 && bq->read_index >= bq->write_index) { +            bq->in_prebuf = 1; +            return 1; +        } -        bq->state = RUNNING; +        return 0; +    } +} -    } else if (bq->prebuf > 0 && bq->read_index >= bq->write_index) { +int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) { +    pa_assert(bq); +    pa_assert(chunk); -        /* Buffer underflow protection */ -        bq->state = PREBUF; +    /* We need to pre-buffer */ +    if (memblockq_check_prebuf(bq))          return -1; -    }      /* Do we need to spit out silence? */      if (!bq->blocks || bq->blocks->index > bq->read_index) { @@ -390,43 +408,16 @@ int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) {      return 0;  } -void pa_memblockq_drop(pa_memblockq *bq, const pa_memchunk *chunk, size_t length) { +void pa_memblockq_drop(pa_memblockq *bq, size_t length) {      pa_assert(bq);      pa_assert(length % bq->base == 0); -    pa_assert(!chunk || length <= chunk->length); - -    if (chunk) { - -        if (bq->blocks && bq->blocks->index == bq->read_index) { -            /* The first item in queue is valid */ - -            /* Does the chunk match with what the user supplied us? */ -            if (memcmp(chunk, &bq->blocks->chunk, sizeof(pa_memchunk)) != 0) -                return; - -        } else { -            size_t l; - -            /* The first item in the queue is not yet relevant */ - -            pa_assert(!bq->blocks || bq->blocks->index > bq->read_index); -            l = bq->blocks ? bq->blocks->index - bq->read_index : 0; - -            if (bq->silence) { - -                if (!l || l > pa_memblock_get_length(bq->silence)) -                    l = pa_memblock_get_length(bq->silence); - -            } - -            /* Do the entries still match? */ -            if (chunk->index != 0 || chunk->length != l || chunk->memblock != bq->silence) -                return; -        } -    } - +          while (length > 0) { +        /* Do not drop any data when we are in prebuffering mode */ +        if (memblockq_check_prebuf(bq)) +            break; +                  if (bq->blocks) {              size_t d; @@ -476,15 +467,11 @@ void pa_memblockq_drop(pa_memblockq *bq, const pa_memchunk *chunk, size_t length  int pa_memblockq_is_readable(pa_memblockq *bq) {      pa_assert(bq); -    if (bq->prebuf > 0) { -        size_t l = pa_memblockq_get_length(bq); - -        if (bq->state == PREBUF && l < bq->prebuf) -            return 0; +    if (memblockq_check_prebuf(bq)) +        return 0; -        if (l <= 0) -            return 0; -    } +    if (pa_memblockq_get_length(bq) <= 0) +        return 0;      return 1;  } @@ -506,7 +493,7 @@ size_t pa_memblockq_missing(pa_memblockq *bq) {          return 0;      l = bq->tlength - l; -    return (l >= bq->minreq) ? l : 0; +    return l >= bq->minreq ? l : 0;  }  size_t pa_memblockq_get_minreq(pa_memblockq *bq) { @@ -529,7 +516,7 @@ void pa_memblockq_seek(pa_memblockq *bq, int64_t offset, pa_seek_mode_t seek) {              bq->write_index = bq->read_index + offset;              return;          case PA_SEEK_RELATIVE_END: -            bq->write_index = (bq->blocks_tail ? bq->blocks_tail->index + (int64_t)bq->blocks_tail->chunk.length : bq->read_index) + offset; +            bq->write_index = (bq->blocks_tail ? bq->blocks_tail->index + (int64_t) bq->blocks_tail->chunk.length : bq->read_index) + offset;              return;      } @@ -569,7 +556,7 @@ int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk) {      pa_memchunk rchunk;      pa_assert(bq); -    pa_assert(chunk && bq->base); +    pa_assert(chunk);      if (bq->base == 1)          return pa_memblockq_push(bq, chunk); @@ -601,21 +588,20 @@ void pa_memblockq_shorten(pa_memblockq *bq, size_t length) {      l = pa_memblockq_get_length(bq);      if (l > length) -        pa_memblockq_drop(bq, NULL, l - length); +        pa_memblockq_drop(bq, l - length);  }  void pa_memblockq_prebuf_disable(pa_memblockq *bq) {      pa_assert(bq); -    if (bq->state == PREBUF) -        bq->state = RUNNING; +    bq->in_prebuf = 0;  }  void pa_memblockq_prebuf_force(pa_memblockq *bq) {      pa_assert(bq); -    if (bq->state == RUNNING && bq->prebuf > 0) -        bq->state = PREBUF; +    if (!bq->in_prebuf && bq->prebuf > 0) +        bq->in_prebuf = 1;  }  size_t pa_memblockq_get_maxlength(pa_memblockq *bq) { diff --git a/src/pulsecore/memblockq.h b/src/pulsecore/memblockq.h index e8243568..5eb23aac 100644 --- a/src/pulsecore/memblockq.h +++ b/src/pulsecore/memblockq.h @@ -83,13 +83,16 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *chunk);   * you know what you do. */  int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk); -/* Return a copy of the next memory chunk in the queue. It is not removed from the queue */ +/* Return a copy of the next memory chunk in the queue. It is not + * removed from the queue. There are two reasons this function might + * fail: 1. prebuffering is active, 2. queue is empty and no silence + * memblock was passed at initialization. If the queue is not empty, + * but we're currently at a hole in the queue and no silence memblock + * was passed we return the length of the hole in chunk->length. */  int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk); -/* Drop the specified bytes from the queue, but only if the first - * chunk in the queue matches the one passed here. If NULL is passed, - * this check isn't done. */ -void pa_memblockq_drop(pa_memblockq *bq, const pa_memchunk *chunk, size_t length); +/* Drop the specified bytes from the queue. */ +void pa_memblockq_drop(pa_memblockq *bq, size_t length);  /* Test if the pa_memblockq is currently readable, that is, more data than base */  int pa_memblockq_is_readable(pa_memblockq *bq); diff --git a/src/pulsecore/play-memblockq.c b/src/pulsecore/play-memblockq.c index 9c5945af..51ea22e8 100644 --- a/src/pulsecore/play-memblockq.c +++ b/src/pulsecore/play-memblockq.c @@ -37,7 +37,7 @@  #include "play-memblockq.h" -static void sink_input_kill(pa_sink_input *i) { +static void sink_input_kill_cb(pa_sink_input *i) {      pa_memblockq *q;      assert(i);      assert(i->userdata); @@ -50,7 +50,7 @@ static void sink_input_kill(pa_sink_input *i) {      pa_memblockq_free(q);  } -static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) { +static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {      pa_memblockq *q;      assert(i);      assert(chunk); @@ -61,11 +61,11 @@ static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) {      return pa_memblockq_peek(q, chunk);  } -static void si_kill(PA_GCC_UNUSED pa_mainloop_api *m, void *i) { -    sink_input_kill(i); +static void si_kill_cb(PA_GCC_UNUSED pa_mainloop_api *m, void *i) { +    sink_input_kill_cb(i);  } -static void sink_input_drop(pa_sink_input *i, const pa_memchunk*chunk, size_t length) { +static void sink_input_drop_cb(pa_sink_input *i, size_t length) {      pa_memblockq *q;      assert(i); @@ -74,10 +74,10 @@ static void sink_input_drop(pa_sink_input *i, const pa_memchunk*chunk, size_t le      q = i->userdata; -    pa_memblockq_drop(q, chunk, length); +    pa_memblockq_drop(q, length);      if (pa_memblockq_get_length(q) <= 0) -        pa_mainloop_api_once(i->sink->core->mainloop, si_kill, i); +        pa_mainloop_api_once(i->sink->core->mainloop, si_kill_cb, i);  }  int pa_play_memblockq( @@ -116,9 +116,9 @@ int pa_play_memblockq(      if (!(si = pa_sink_input_new(sink->core, &data, 0)))          return -1; -    si->peek = sink_input_peek; -    si->drop = sink_input_drop; -    si->kill = sink_input_kill; +    si->peek = sink_input_peek_cb; +    si->drop = sink_input_drop_cb; +    si->kill = sink_input_kill_cb;      si->userdata = q; diff --git a/src/pulsecore/play-memchunk.c b/src/pulsecore/play-memchunk.c index 65b6e825..7e750baa 100644 --- a/src/pulsecore/play-memchunk.c +++ b/src/pulsecore/play-memchunk.c @@ -37,7 +37,7 @@  #include "play-memchunk.h" -static void sink_input_kill(pa_sink_input *i) { +static void sink_input_kill_cb(pa_sink_input *i) {      pa_memchunk *c;      assert(i && i->userdata);      c = i->userdata; @@ -49,7 +49,7 @@ static void sink_input_kill(pa_sink_input *i) {      pa_xfree(c);  } -static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) { +static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {      pa_memchunk *c;      assert(i && chunk && i->userdata);      c = i->userdata; @@ -64,23 +64,24 @@ static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) {      return 0;  } -static void si_kill(PA_GCC_UNUSED pa_mainloop_api *m, void *i) { -    sink_input_kill(i); +static void si_kill_cb(PA_GCC_UNUSED pa_mainloop_api *m, void *i) { +    sink_input_kill_cb(i);  } -static void sink_input_drop(pa_sink_input *i, const pa_memchunk*chunk, size_t length) { +static void sink_input_drop_cb(pa_sink_input *i, size_t length) {      pa_memchunk *c;      assert(i && length && i->userdata);      c = i->userdata; -    assert(!memcmp(chunk, c, sizeof(chunk))); -    assert(length <= c->length); +    if (length >= c->length) { +        c->length -= length; +        c->index += length; +    } else { -    c->length -= length; -    c->index += length; +        c->length = 0; -    if (c->length <= 0) -        pa_mainloop_api_once(i->sink->core->mainloop, si_kill, i); +        pa_mainloop_api_once(i->sink->core->mainloop, si_kill_cb, i); +    }  }  int pa_play_memchunk( @@ -113,9 +114,9 @@ int pa_play_memchunk(      if (!(si = pa_sink_input_new(sink->core, &data, 0)))          return -1; -    si->peek = sink_input_peek; -    si->drop = sink_input_drop; -    si->kill = sink_input_kill; +    si->peek = sink_input_peek_cb; +    si->drop = sink_input_drop_cb; +    si->kill = sink_input_kill_cb;      si->userdata = nchunk = pa_xnew(pa_memchunk, 1);      *nchunk = *chunk; diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index c423487a..8f9aed58 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -67,7 +67,6 @@ typedef struct connection {  PA_DECLARE_CLASS(connection);  #define CONNECTION(o) (connection_cast(o)) -  static PA_DEFINE_CHECK_TYPE(connection, connection_check_type, pa_msgobject_check_type);  struct pa_protocol_simple { @@ -230,7 +229,7 @@ static int do_write(connection *c) {          return -1;      } -    pa_memblockq_drop(c->output_memblockq, &chunk, r); +    pa_memblockq_drop(c->output_memblockq, r);      return 0;  } @@ -271,7 +270,6 @@ fail:  static int connection_process_msg(pa_msgobject *o, int code, void*userdata, pa_memchunk *chunk) {      connection *c = CONNECTION(o); -          connection_assert_ref(c);      switch (code) { @@ -351,13 +349,13 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {  /*     pa_log("peeked %u %i", r >= 0 ? chunk->length: 0, r); */      if (c->dead && r < 0) -        pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_DROP_CONNECTION, c, NULL, NULL); +        pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_DROP_CONNECTION, NULL, NULL, NULL);      return r;  }  /* Called from thread context */ -static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) { +static void sink_input_drop_cb(pa_sink_input *i, size_t length) {      connection*c = i->userdata;      size_t old, new; @@ -366,7 +364,7 @@ static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_      pa_assert(length);      old = pa_memblockq_missing(c->input_memblockq); -    pa_memblockq_drop(c->input_memblockq, chunk, length); +    pa_memblockq_drop(c->input_memblockq, length);      new = pa_memblockq_missing(c->input_memblockq);      if (new > old) { @@ -378,9 +376,8 @@ static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_  /* Called from main context */  static void sink_input_kill_cb(pa_sink_input *i) {      pa_assert(i); -    pa_assert(i->userdata); -    connection_drop((connection *) i->userdata); +    connection_drop(CONNECTION(i->userdata));  }  /*** source_output callbacks ***/ diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c index d27f00f0..db98dd54 100644 --- a/src/pulsecore/sink-input.c +++ b/src/pulsecore/sink-input.c @@ -341,7 +341,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)  /*     } */      if (!i->thread_info.resampler) { -        do_volume_adj_here = 0; +        do_volume_adj_here = 0; /* FIXME??? */          ret = i->peek(i, chunk);          goto finish;      } @@ -356,15 +356,14 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)          if ((ret = i->peek(i, &tchunk)) < 0)              goto finish; -        pa_assert(tchunk.length); +        pa_assert(tchunk.length > 0);          l = pa_resampler_request(i->thread_info.resampler, CONVERT_BUFFER_LENGTH); -        if (l > tchunk.length) -            l = tchunk.length; +        if (tchunk.length > l) +            tchunk.length = l; -        i->drop(i, &tchunk, l); -        tchunk.length = l; +        i->drop(i, tchunk.length);          /* It might be necessary to adjust the volume here */          if (do_volume_adj_here && !volume_is_norm) { @@ -377,7 +376,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)      }      pa_assert(i->thread_info.resampled_chunk.memblock); -    pa_assert(i->thread_info.resampled_chunk.length); +    pa_assert(i->thread_info.resampled_chunk.length > 0);      *chunk = i->thread_info.resampled_chunk;      pa_memblock_ref(i->thread_info.resampled_chunk.memblock); @@ -409,7 +408,7 @@ finish:      return ret;  } -void pa_sink_input_drop(pa_sink_input *i, const pa_memchunk *chunk, size_t length) { +void pa_sink_input_drop(pa_sink_input *i, size_t length) {      pa_sink_input_assert_ref(i);      pa_assert(length > 0); @@ -440,22 +439,67 @@ void pa_sink_input_drop(pa_sink_input *i, const pa_memchunk *chunk, size_t lengt  /*         return; */  /*     } */ -    if (!i->thread_info.resampler) { -        if (i->drop) -            i->drop(i, chunk, length); -        return; -    } - -    pa_assert(i->thread_info.resampled_chunk.memblock); -    pa_assert(i->thread_info.resampled_chunk.length >= length); +    pa_log("dropping %u", length); +     +    if (i->thread_info.resampled_chunk.memblock) { +        size_t l = length; + +        if (l > i->thread_info.resampled_chunk.length) +            l = i->thread_info.resampled_chunk.length; + +        pa_log("really dropping %u", l); +         +        i->thread_info.resampled_chunk.index += l; +        i->thread_info.resampled_chunk.length -= l; +         +        if (i->thread_info.resampled_chunk.length <= 0) { +            pa_memblock_unref(i->thread_info.resampled_chunk.memblock); +            pa_memchunk_reset(&i->thread_info.resampled_chunk); +        } -    i->thread_info.resampled_chunk.index += length; -    i->thread_info.resampled_chunk.length -= length; +        length -= l; +    } -    if (i->thread_info.resampled_chunk.length <= 0) { -        pa_memblock_unref(i->thread_info.resampled_chunk.memblock); -        i->thread_info.resampled_chunk.memblock = NULL; -        i->thread_info.resampled_chunk.index = i->thread_info.resampled_chunk.length = 0; +    pa_log("really remaining %u", length); +     +    if (length > 0) { +         +        if (i->thread_info.resampler) { +            /* So, we have a resampler. To avoid discontinuities we +             * have to actually read all data that could be read and +             * pass it through the resampler. */ + +            while (length > 0) { +                pa_memchunk chunk; +                pa_cvolume volume; +                 +                if (pa_sink_input_peek(i, &chunk, &volume) >= 0) { +                    size_t l = chunk.length; + +                    if (l > length) +                        l = length; +                     +                    pa_sink_input_drop(i, l); +                    length -= l; +                     +                } else { +                    /* Hmmm, peeking failed, so let's at least drop +                     * the right amount of data */ +                     +                    if (i->drop) +                        i->drop(i, pa_resampler_request(i->thread_info.resampler, length)); +                             +                    break; +                } +            } + +        } else { + +            /* We have no resampler, hence let's just drop the data */ + +            if (i->drop) +                i->drop(i, length); +        }      }  } diff --git a/src/pulsecore/sink-input.h b/src/pulsecore/sink-input.h index 426e48c0..fe62917a 100644 --- a/src/pulsecore/sink-input.h +++ b/src/pulsecore/sink-input.h @@ -75,7 +75,7 @@ struct pa_sink_input {      int muted;      int (*peek) (pa_sink_input *i, pa_memchunk *chunk); -    void (*drop) (pa_sink_input *i, const pa_memchunk *chunk, size_t length); +    void (*drop) (pa_sink_input *i, size_t length);      void (*kill) (pa_sink_input *i);             /* may be NULL */      pa_usec_t (*get_latency) (pa_sink_input *i); /* may be NULL */      void (*underrun) (pa_sink_input *i);         /* may be NULL */ @@ -178,7 +178,7 @@ pa_sink_input_state_t pa_sink_input_get_state(pa_sink_input *i);  /* To be used exclusively by the sink driver thread */  int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume); -void pa_sink_input_drop(pa_sink_input *i, const pa_memchunk *chunk, size_t length); +void pa_sink_input_drop(pa_sink_input *i, size_t length);  int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk);  #endif diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c index 5a79a41c..a66097bc 100644 --- a/src/pulsecore/sink.c +++ b/src/pulsecore/sink.c @@ -326,7 +326,7 @@ static void inputs_drop(pa_sink *s, pa_mix_info *info, unsigned n, size_t length          }          /* Drop read data */ -        pa_sink_input_drop(i, m ? &m->chunk : NULL, length); +        pa_sink_input_drop(i, length);          if (m) {              pa_sink_input_unref(m->userdata); diff --git a/src/pulsecore/sound-file-stream.c b/src/pulsecore/sound-file-stream.c index 974c053a..c31187c5 100644 --- a/src/pulsecore/sound-file-stream.c +++ b/src/pulsecore/sound-file-stream.c @@ -26,7 +26,6 @@  #endif  #include <stdlib.h> -#include <assert.h>  #include <stdio.h>  #include <string.h> @@ -41,89 +40,177 @@  #define BUF_SIZE (1024*10) -struct userdata { +typedef struct file_stream { +    pa_msgobject parent; +    pa_core *core;      SNDFILE *sndfile;      pa_sink_input *sink_input;      pa_memchunk memchunk;      sf_count_t (*readf_function)(SNDFILE *sndfile, void *ptr, sf_count_t frames); +    size_t drop; +} file_stream; + +enum { +    MESSAGE_DROP_FILE_STREAM  }; -static void free_userdata(struct userdata *u) { -    assert(u); -    if (u->sink_input) { -        pa_sink_input_disconnect(u->sink_input); -        pa_sink_input_unref(u->sink_input); -    } +PA_DECLARE_CLASS(file_stream); +#define FILE_STREAM(o) (file_stream_cast(o)) +static PA_DEFINE_CHECK_TYPE(file_stream, file_stream_check_type, pa_msgobject_check_type); + +static void file_stream_free(pa_object *o) { +    file_stream *u = FILE_STREAM(o); +    pa_assert(u); +    pa_log("xxxx ffreee"); +          if (u->memchunk.memblock)          pa_memblock_unref(u->memchunk.memblock); +      if (u->sndfile)          sf_close(u->sndfile);      pa_xfree(u);  } -static void sink_input_kill(pa_sink_input *i) { -    assert(i && i->userdata); -    free_userdata(i->userdata); -} - -static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) { -    struct userdata *u; -    assert(i && chunk && i->userdata); -    u = i->userdata; +static void file_stream_drop(file_stream *u) { +    file_stream_assert_ref(u); -    if (!u->memchunk.memblock) { -        uint32_t fs = pa_frame_size(&i->sample_spec); -        sf_count_t n; -        void *p; +    pa_log("xxxx drop"); +     +     +    if (u->sink_input) { +        pa_sink_input_disconnect(u->sink_input); +        pa_sink_input_unref(u->sink_input); +        u->sink_input = NULL; -        u->memchunk.memblock = pa_memblock_new(i->sink->core->mempool, BUF_SIZE); -        u->memchunk.index = 0; +        /* Make sure we don't decrease the ref count twice. */ +        file_stream_unref(u); +    } +} -        p = pa_memblock_acquire(u->memchunk.memblock); +static int file_stream_process_msg(pa_msgobject *o, int code, void*userdata, pa_memchunk *chunk) { +    file_stream *u = FILE_STREAM(o); +    file_stream_assert_ref(u); +     +    switch (code) { +        case MESSAGE_DROP_FILE_STREAM: +            file_stream_drop(u); +            break; +    } -        if (u->readf_function) { -            if ((n = u->readf_function(u->sndfile, p, BUF_SIZE/fs)) <= 0) -                n = 0; +    return 0; +} -            u->memchunk.length = n * fs; -        } else { -            if ((n = sf_read_raw(u->sndfile, p, BUF_SIZE)) <= 0) -                n = 0; +static void sink_input_kill_cb(pa_sink_input *i) { +    pa_assert(i); +     +    file_stream_drop(FILE_STREAM(i->userdata)); +} -            u->memchunk.length = n; +static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { +    file_stream *u; +     +    pa_assert(i); +    pa_assert(chunk); +    u = FILE_STREAM(i->userdata); +    file_stream_assert_ref(u); + +    for (;;) { +         +        if (!u->memchunk.memblock) { +             +            u->memchunk.memblock = pa_memblock_new(i->sink->core->mempool, BUF_SIZE); +            u->memchunk.index = 0; +             +            if (u->readf_function) { +                sf_count_t n; +                void *p; +                size_t fs = pa_frame_size(&i->sample_spec); +                 +                p = pa_memblock_acquire(u->memchunk.memblock); +                n = u->readf_function(u->sndfile, p, BUF_SIZE/fs); +                pa_memblock_release(u->memchunk.memblock); + +                pa_log("%u/%u = data: %02x %02x %02x %02x %02x %02x %02x %02x", +                       (unsigned int) n, BUF_SIZE/fs, +                       ((uint8_t*)p)[0], ((uint8_t*)p)[1], ((uint8_t*)p)[2], ((uint8_t*)p)[3], +                       ((uint8_t*)p)[4], ((uint8_t*)p)[5], ((uint8_t*)p)[6], ((uint8_t*)p)[7]); +                 +                if (n <= 0) +                    n = 0; +                 +                u->memchunk.length = n * fs; +            } else { +                sf_count_t n; +                void *p; + +                p = pa_memblock_acquire(u->memchunk.memblock); +                n = sf_read_raw(u->sndfile, p, BUF_SIZE); +                pa_memblock_release(u->memchunk.memblock); +                 +                if (n <= 0) +                    n = 0; +                 +                u->memchunk.length = n; +            } +             +            if (u->memchunk.length <= 0) { + +                pa_memblock_unref(u->memchunk.memblock); +                pa_memchunk_reset(&u->memchunk); +                 +                pa_asyncmsgq_post(u->core->asyncmsgq, PA_MSGOBJECT(u), MESSAGE_DROP_FILE_STREAM, NULL, NULL, NULL); +                return -1; +            }          } -        pa_memblock_release(u->memchunk.memblock); -        if (!u->memchunk.length) { -            free_userdata(u); -            return -1; +        pa_assert(u->memchunk.memblock); +        pa_assert(u->memchunk.length > 0); + +        if (u->drop < u->memchunk.length) { +            u->memchunk.index += u->drop; +            u->memchunk.length -= u->drop; +            u->drop = 0; +            break;          } +                 +        u->drop -= u->memchunk.length; +        pa_memblock_unref(u->memchunk.memblock); +        pa_memchunk_reset(&u->memchunk);      }      *chunk = u->memchunk;      pa_memblock_ref(chunk->memblock); -    assert(chunk->length); +     +    pa_assert(chunk->length > 0); +    pa_assert(u->drop <= 0); +          return 0;  } -static void sink_input_drop(pa_sink_input *i, const pa_memchunk*chunk, size_t length) { -    struct userdata *u; -    assert(i && chunk && length && i->userdata); -    u = i->userdata; +static void sink_input_drop_cb(pa_sink_input *i, size_t length) { +    file_stream *u; -    assert(!memcmp(chunk, &u->memchunk, sizeof(chunk))); -    assert(length <= u->memchunk.length); +    pa_assert(i); +    pa_assert(length > 0); +    u = FILE_STREAM(i->userdata); +    file_stream_assert_ref(u); +     +    if (u->memchunk.memblock) { -    u->memchunk.index += length; -    u->memchunk.length -= length; +        if (length < u->memchunk.length) { +            u->memchunk.index += length; +            u->memchunk.length -= length; +            return; +        } -    if (u->memchunk.length <= 0) { +        length -= u->memchunk.length;          pa_memblock_unref(u->memchunk.memblock); -        u->memchunk.memblock = NULL; -        u->memchunk.index = u->memchunk.length = 0; +        pa_memchunk_reset(&u->memchunk);      } +             +    u->drop += length;  }  int pa_play_file( @@ -131,19 +218,23 @@ int pa_play_file(          const char *fname,          const pa_cvolume *volume) { -    struct userdata *u = NULL; +    file_stream *u = NULL;      SF_INFO sfinfo;      pa_sample_spec ss;      pa_sink_input_new_data data; -    assert(sink); -    assert(fname); +    pa_assert(sink); +    pa_assert(fname); -    u = pa_xnew(struct userdata, 1); +    u = pa_msgobject_new(file_stream, file_stream_check_type); +    u->parent.parent.free = file_stream_free; +    u->parent.process_msg = file_stream_process_msg; +    u->core = sink->core;      u->sink_input = NULL; -    u->memchunk.memblock = NULL; -    u->memchunk.index = u->memchunk.length = 0; +    pa_memchunk_reset(&u->memchunk);      u->sndfile = NULL; +    u->readf_function = NULL; +    u->drop = 0;      memset(&sfinfo, 0, sizeof(sfinfo)); @@ -152,8 +243,6 @@ int pa_play_file(          goto fail;      } -    u->readf_function = NULL; -      switch (sfinfo.format & 0xFF) {          case SF_FORMAT_PCM_16:          case SF_FORMAT_PCM_U8: @@ -195,18 +284,21 @@ int pa_play_file(      if (!(u->sink_input = pa_sink_input_new(sink->core, &data, 0)))          goto fail; -    u->sink_input->peek = sink_input_peek; -    u->sink_input->drop = sink_input_drop; -    u->sink_input->kill = sink_input_kill; +    u->sink_input->peek = sink_input_peek_cb; +    u->sink_input->drop = sink_input_drop_cb; +    u->sink_input->kill = sink_input_kill_cb;      u->sink_input->userdata = u; -/*     pa_sink_notify(u->sink_input->sink); */ +    pa_sink_input_put(u->sink_input); + +    /* The reference to u is dangling here, because we want to keep +     * this stream around until it is fully played. */      return 0;  fail:      if (u) -        free_userdata(u); +        file_stream_unref(u);      return -1;  } diff --git a/src/tests/memblockq-test.c b/src/tests/memblockq-test.c index 7ad3b2f3..25ea399b 100644 --- a/src/tests/memblockq-test.c +++ b/src/tests/memblockq-test.c @@ -26,6 +26,7 @@  #include <stdlib.h>  #include <assert.h>  #include <stdio.h> +#include <signal.h>  #include <pulsecore/memblockq.h>  #include <pulsecore/log.h> @@ -48,22 +49,22 @@ int main(int argc, char *argv[]) {      bq = pa_memblockq_new(0, 40, 10, 2, 4, 4, silence);      assert(bq); -    chunk1.memblock = pa_memblock_new_fixed(p, (char*) "AA", 2, 1); +    chunk1.memblock = pa_memblock_new_fixed(p, (char*) "11", 2, 1);      chunk1.index = 0;      chunk1.length = 2;      assert(chunk1.memblock); -    chunk2.memblock = pa_memblock_new_fixed(p, (char*) "TTBB", 4, 1); +    chunk2.memblock = pa_memblock_new_fixed(p, (char*) "XX22", 4, 1);      chunk2.index = 2;      chunk2.length = 2;      assert(chunk2.memblock); -    chunk3.memblock = pa_memblock_new_fixed(p, (char*) "ZZZZ", 4, 1); +    chunk3.memblock = pa_memblock_new_fixed(p, (char*) "3333", 4, 1);      chunk3.index = 0;      chunk3.length = 4;      assert(chunk3.memblock); -    chunk4.memblock = pa_memblock_new_fixed(p, (char*) "KKKKKKKK", 8, 1); +    chunk4.memblock = pa_memblock_new_fixed(p, (char*) "44444444", 8, 1);      chunk4.index = 0;      chunk4.length = 8;      assert(chunk4.memblock); @@ -115,13 +116,12 @@ int main(int argc, char *argv[]) {      chunk3.index += 2;      chunk3.length -= 2; -      ret = pa_memblockq_push(bq, &chunk3);      assert(ret == 0); -    printf(">"); +    pa_memblockq_shorten(bq, pa_memblockq_get_length(bq)-2); -    pa_memblockq_shorten(bq, 6); +    printf(">");      for (;;) {          pa_memchunk out; @@ -137,7 +137,7 @@ int main(int argc, char *argv[]) {          pa_memblock_release(out.memblock);          pa_memblock_unref(out.memblock); -        pa_memblockq_drop(bq, &out, out.length); +        pa_memblockq_drop(bq, out.length);      }      printf("<\n"); | 
