diff options
Diffstat (limited to 'src')
36 files changed, 993 insertions, 502 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 64df8614..0ce805ec 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -268,8 +268,7 @@ thread_test_CFLAGS = $(AM_CFLAGS)  thread_test_LDADD = $(AM_LDADD) libpulsecore.la  thread_test_LDFLAGS = $(AM_LDFLAGS) $(BINLDFLAGS)  -flist_test_SOURCES = tests/flist-test.c \ -		pulsecore/flist.c pulsecore/flist.h +flist_test_SOURCES = tests/flist-test.c  flist_test_CFLAGS = $(AM_CFLAGS)  flist_test_LDADD = $(AM_LDADD) libpulsecore.la  flist_test_LDFLAGS = $(AM_LDFLAGS) $(BINLDFLAGS)  @@ -448,6 +447,8 @@ libpulse_la_SOURCES += \  		pulsecore/core-error.c pulsecore/core-error.h \  		pulsecore/winsock.h pulsecore/creds.h \  		pulsecore/shm.c pulsecore/shm.h \ +		pulsecore/flist.c pulsecore/flist.h \ +		pulsecore/anotify.c pulsecore/anotify.h \  		$(PA_THREAD_OBJS)  if OS_IS_WIN32 @@ -628,6 +629,8 @@ libpulsecore_la_SOURCES += \  		pulsecore/core-error.c pulsecore/core-error.h \  		pulsecore/hook-list.c pulsecore/hook-list.h \  		pulsecore/shm.c pulsecore/shm.h \ +		pulsecore/flist.c pulsecore/flist.h \ +		pulsecore/anotify.c pulsecore/anotify.h \  		$(PA_THREAD_OBJS)  if OS_IS_WIN32 diff --git a/src/modules/module-alsa-sink.c b/src/modules/module-alsa-sink.c index 6ff9a6e4..7bbd7de2 100644 --- a/src/modules/module-alsa-sink.c +++ b/src/modules/module-alsa-sink.c @@ -144,6 +144,7 @@ static void do_write(struct userdata *u) {      update_usage(u);      for (;;) { +        void *p;          pa_memchunk *memchunk = NULL;          snd_pcm_sframes_t frames; @@ -156,9 +157,15 @@ static void do_write(struct userdata *u) {                  memchunk = &u->memchunk;          } -        assert(memchunk->memblock && memchunk->memblock->data && memchunk->length && memchunk->memblock->length && (memchunk->length % u->frame_size) == 0); +        assert(memchunk->memblock); +        assert(memchunk->length); +        assert((memchunk->length % u->frame_size) == 0); -        if ((frames = snd_pcm_writei(u->pcm_handle, (uint8_t*) memchunk->memblock->data + memchunk->index, memchunk->length / u->frame_size)) < 0) { +        p = pa_memblock_acquire(memchunk->memblock); +         +        if ((frames = snd_pcm_writei(u->pcm_handle, (uint8_t*) p + memchunk->index, memchunk->length / u->frame_size)) < 0) { +            pa_memblock_release(memchunk->memblock); +                          if (frames == -EAGAIN)                  return; @@ -176,6 +183,9 @@ static void do_write(struct userdata *u) {              return;          } +        pa_memblock_release(memchunk->memblock); +         +          if (memchunk == &u->memchunk) {              size_t l = frames * u->frame_size;              memchunk->index += l; diff --git a/src/modules/module-alsa-source.c b/src/modules/module-alsa-source.c index aa0666f1..9bde46da 100644 --- a/src/modules/module-alsa-source.c +++ b/src/modules/module-alsa-source.c @@ -149,6 +149,7 @@ static void do_read(struct userdata *u) {          pa_memchunk post_memchunk;          snd_pcm_sframes_t frames;          size_t l; +        void *p;          if (!u->memchunk.memblock) {              u->memchunk.memblock = pa_memblock_new(u->source->core->mempool, u->memchunk.length = u->fragment_size); @@ -157,11 +158,13 @@ static void do_read(struct userdata *u) {          assert(u->memchunk.memblock);          assert(u->memchunk.length); -        assert(u->memchunk.memblock->data); -        assert(u->memchunk.memblock->length);          assert(u->memchunk.length % u->frame_size == 0); -        if ((frames = snd_pcm_readi(u->pcm_handle, (uint8_t*) u->memchunk.memblock->data + u->memchunk.index, u->memchunk.length / u->frame_size)) < 0) { +        p = pa_memblock_acquire(u->memchunk.memblock); +         +        if ((frames = snd_pcm_readi(u->pcm_handle, (uint8_t*) p + u->memchunk.index, u->memchunk.length / u->frame_size)) < 0) { +            pa_memblock_release(u->memchunk.memblock); +                          if (frames == -EAGAIN)                  return; @@ -178,6 +181,7 @@ static void do_read(struct userdata *u) {              pa_module_unload_request(u->module);              return;          } +        pa_memblock_release(u->memchunk.memblock);          l = frames * u->frame_size; diff --git a/src/modules/module-esound-sink.c b/src/modules/module-esound-sink.c index 6d4a8489..ca1f16ce 100644 --- a/src/modules/module-esound-sink.c +++ b/src/modules/module-esound-sink.c @@ -142,18 +142,25 @@ static int do_write(struct userdata *u) {              u->write_index = u->write_length = 0;          }      } else if (u->state == STATE_RUNNING) { +        void *p; +                  pa_module_set_used(u->module, pa_sink_used_by(u->sink));          if (!u->memchunk.length)              if (pa_sink_render(u->sink, 8192, &u->memchunk) < 0)                  return 0; -        assert(u->memchunk.memblock && u->memchunk.length); +        assert(u->memchunk.memblock); +        assert(u->memchunk.length); + +        p = pa_memblock_acquire(u->memchunk.memblock); -        if ((r = pa_iochannel_write(u->io, (uint8_t*) u->memchunk.memblock->data + u->memchunk.index, u->memchunk.length)) < 0) { +        if ((r = pa_iochannel_write(u->io, (uint8_t*) p + u->memchunk.index, u->memchunk.length)) < 0) { +            pa_memblock_release(u->memchunk.memblock);              pa_log("write() failed: %s", pa_cstrerror(errno));              return -1;          } +        pa_memblock_release(u->memchunk.memblock);          u->memchunk.index += r;          u->memchunk.length -= r; diff --git a/src/modules/module-jack-sink.c b/src/modules/module-jack-sink.c index 47f77bab..66ded27f 100644 --- a/src/modules/module-jack-sink.c +++ b/src/modules/module-jack-sink.c @@ -135,22 +135,25 @@ static void io_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_          unsigned fs;          jack_nframes_t frame_idx;          pa_memchunk chunk; +        void *p;          fs = pa_frame_size(&u->sink->sample_spec);          pa_sink_render_full(u->sink, u->frames_requested * fs, &chunk); +        p = pa_memblock_acquire(chunk.memblock);          for (frame_idx = 0; frame_idx < u->frames_requested; frame_idx ++) {              unsigned c;              for (c = 0; c < u->channels; c++) { -                float *s = ((float*) ((uint8_t*) chunk.memblock->data + chunk.index)) + (frame_idx * u->channels) + c; +                float *s = ((float*) ((uint8_t*) p + chunk.index)) + (frame_idx * u->channels) + c;                  float *d = ((float*) u->buffer[c]) + frame_idx;                  *d = *s;              }          } +        pa_memblock_release(chunk.memblock);          pa_memblock_unref(chunk.memblock);          u->frames_requested = 0; diff --git a/src/modules/module-jack-source.c b/src/modules/module-jack-source.c index 62a99108..5270b241 100644 --- a/src/modules/module-jack-source.c +++ b/src/modules/module-jack-source.c @@ -134,23 +134,28 @@ static void io_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_          unsigned fs;          jack_nframes_t frame_idx;          pa_memchunk chunk; +        void *p;          fs = pa_frame_size(&u->source->sample_spec);          chunk.memblock = pa_memblock_new(u->core->mempool, chunk.length = u->frames_posted * fs);          chunk.index = 0; + +        p = pa_memblock_acquire(chunk.memblock);          for (frame_idx = 0; frame_idx < u->frames_posted; frame_idx ++) {              unsigned c;              for (c = 0; c < u->channels; c++) {                  float *s = ((float*) u->buffer[c]) + frame_idx; -                float *d = ((float*) ((uint8_t*) chunk.memblock->data + chunk.index)) + (frame_idx * u->channels) + c; +                float *d = ((float*) ((uint8_t*) p + chunk.index)) + (frame_idx * u->channels) + c;                  *d = *s;              }          } +        pa_memblock_release(chunk.memblock); +                  pa_source_post(u->source, &chunk);          pa_memblock_unref(chunk.memblock); diff --git a/src/modules/module-oss-mmap.c b/src/modules/module-oss-mmap.c index 5ab08287..39a8511f 100644 --- a/src/modules/module-oss-mmap.c +++ b/src/modules/module-oss-mmap.c @@ -170,7 +170,7 @@ static void out_fill_memblocks(struct userdata *u, unsigned n) {                      u->out_fragment_size,                      1);          assert(chunk.memblock); -        chunk.length = chunk.memblock->length; +        chunk.length = pa_memblock_get_length(chunk.memblock);          chunk.index = 0;          pa_sink_render_into_full(u->sink, &chunk); @@ -214,7 +214,7 @@ static void in_post_memblocks(struct userdata *u, unsigned n) {          if (!u->in_memblocks[u->in_current]) {              chunk.memblock = u->in_memblocks[u->in_current] = pa_memblock_new_fixed(u->core->mempool, (uint8_t*) u->in_mmap+u->in_fragment_size*u->in_current, u->in_fragment_size, 1); -            chunk.length = chunk.memblock->length; +            chunk.length = pa_memblock_get_length(chunk.memblock);              chunk.index = 0;              pa_source_post(u->source, &chunk); diff --git a/src/modules/module-oss.c b/src/modules/module-oss.c index b71581d9..73f0d57e 100644 --- a/src/modules/module-oss.c +++ b/src/modules/module-oss.c @@ -155,6 +155,7 @@ static void do_write(struct userdata *u) {      }      do { +        void *p;          memchunk = &u->memchunk;          if (!memchunk->length) @@ -162,16 +163,18 @@ static void do_write(struct userdata *u) {                  memchunk = &u->silence;          assert(memchunk->memblock); -        assert(memchunk->memblock->data);          assert(memchunk->length); -         -        if ((r = pa_iochannel_write(u->io, (uint8_t*) memchunk->memblock->data + memchunk->index, memchunk->length)) < 0) { + +        p = pa_memblock_acquire(memchunk->memblock); +        if ((r = pa_iochannel_write(u->io, (uint8_t*) p + memchunk->index, memchunk->length)) < 0) { +            pa_memblock_release(memchunk->memblock);              pa_log("write() failed: %s", pa_cstrerror(errno));              clear_up(u);              pa_module_unload_request(u->module);              break;          } +        pa_memblock_release(memchunk->memblock);          if (memchunk == &u->silence)              assert(r % u->sample_size == 0); @@ -217,9 +220,13 @@ static void do_read(struct userdata *u) {      }      do { +        void *p;          memchunk.memblock = pa_memblock_new(u->core->mempool, l); -        assert(memchunk.memblock); -        if ((r = pa_iochannel_read(u->io, memchunk.memblock->data, memchunk.memblock->length)) < 0) { + +        p = pa_memblock_acquire(memchunk.memblock); +         +        if ((r = pa_iochannel_read(u->io, p, pa_memblock_get_length(memchunk.memblock))) < 0) { +            pa_memblock_release(memchunk.memblock);              pa_memblock_unref(memchunk.memblock);              if (errno != EAGAIN) {                  pa_log("read() failed: %s", pa_cstrerror(errno)); @@ -228,9 +235,10 @@ static void do_read(struct userdata *u) {              }              break;          } +        pa_memblock_release(memchunk.memblock); -        assert(r <= (ssize_t) memchunk.memblock->length); -        memchunk.length = memchunk.memblock->length = r; +        assert(r <= (ssize_t) pa_memblock_get_length(memchunk.memblock)); +        memchunk.length = r;          memchunk.index = 0;          pa_source_post(u->source, &memchunk); diff --git a/src/modules/module-pipe-sink.c b/src/modules/module-pipe-sink.c index 4aee849b..59d91aa4 100644 --- a/src/modules/module-pipe-sink.c +++ b/src/modules/module-pipe-sink.c @@ -84,6 +84,8 @@ static const char* const valid_modargs[] = {  static void do_write(struct userdata *u) {      ssize_t r; +    void *p; +          assert(u);      u->core->mainloop->defer_enable(u->defer_event, 0); @@ -97,12 +99,17 @@ static void do_write(struct userdata *u) {          if (pa_sink_render(u->sink, PIPE_BUF, &u->memchunk) < 0)              return; -    assert(u->memchunk.memblock && u->memchunk.length); +    assert(u->memchunk.memblock); +    assert(u->memchunk.length); + +    p = pa_memblock_acquire(u->memchunk.memblock); -    if ((r = pa_iochannel_write(u->io, (uint8_t*) u->memchunk.memblock->data + u->memchunk.index, u->memchunk.length)) < 0) { +    if ((r = pa_iochannel_write(u->io, (uint8_t*) p + u->memchunk.index, u->memchunk.length)) < 0) { +        pa_memblock_release(u->memchunk.memblock);          pa_log("write(): %s", pa_cstrerror(errno));          return;      } +    pa_memblock_release(u->memchunk.memblock);      u->memchunk.index += r;      u->memchunk.length -= r; diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c index c251f7ac..99f4f3b9 100644 --- a/src/modules/module-pipe-source.c +++ b/src/modules/module-pipe-source.c @@ -82,7 +82,9 @@ static const char* const valid_modargs[] = {  static void do_read(struct userdata *u) {      ssize_t r; +    void *p;      pa_memchunk chunk; +          assert(u);      if (!pa_iochannel_is_readable(u->io)) @@ -95,17 +97,22 @@ static void do_read(struct userdata *u) {          u->chunk.index = chunk.length = 0;      } -    assert(u->chunk.memblock && u->chunk.memblock->length > u->chunk.index); -    if ((r = pa_iochannel_read(u->io, (uint8_t*) u->chunk.memblock->data + u->chunk.index, u->chunk.memblock->length - u->chunk.index)) <= 0) { +    assert(u->chunk.memblock); +    assert(pa_memblock_get_length(u->chunk.memblock) > u->chunk.index); + +    p = pa_memblock_acquire(u->chunk.memblock); +    if ((r = pa_iochannel_read(u->io, (uint8_t*) p + u->chunk.index, pa_memblock_get_length(u->chunk.memblock) - u->chunk.index)) <= 0) { +        pa_memblock_release(u->chunk.memblock);          pa_log("read(): %s", pa_cstrerror(errno));          return;      } +    pa_memblock_release(u->chunk.memblock);      u->chunk.length = r;      pa_source_post(u->source, &u->chunk);      u->chunk.index += r; -    if (u->chunk.index >= u->chunk.memblock->length) { +    if (u->chunk.index >= pa_memblock_get_length(u->chunk.memblock)) {          u->chunk.index = u->chunk.length = 0;          pa_memblock_unref(u->chunk.memblock);          u->chunk.memblock = NULL; diff --git a/src/modules/module-sine.c b/src/modules/module-sine.c index fa29ba16..f65b1f3a 100644 --- a/src/modules/module-sine.c +++ b/src/modules/module-sine.c @@ -63,7 +63,7 @@ static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) {      chunk->memblock = pa_memblock_ref(u->memblock);      chunk->index = u->peek_index; -    chunk->length = u->memblock->length - u->peek_index; +    chunk->length = pa_memblock_get_length(u->memblock) - u->peek_index;      return 0;  } @@ -72,11 +72,12 @@ static void sink_input_drop(pa_sink_input *i, const pa_memchunk *chunk, size_t l      assert(i && chunk && length && i->userdata);      u = i->userdata; -    assert(chunk->memblock == u->memblock && length <= u->memblock->length-u->peek_index); +    assert(chunk->memblock == u->memblock); +    assert(length <= pa_memblock_get_length(u->memblock)-u->peek_index);      u->peek_index += length; -    if (u->peek_index >= u->memblock->length) +    if (u->peek_index >= pa_memblock_get_length(u->memblock))          u->peek_index = 0;  } @@ -109,6 +110,7 @@ int pa__init(pa_core *c, pa_module*m) {      pa_sample_spec ss;      uint32_t frequency;      char t[256]; +    void *p;      pa_sink_input_new_data data;      if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { @@ -140,8 +142,10 @@ int pa__init(pa_core *c, pa_module*m) {      }      u->memblock = pa_memblock_new(c->mempool, pa_bytes_per_second(&ss)); -    calc_sine(u->memblock->data, u->memblock->length, frequency); - +    p = pa_memblock_acquire(u->memblock); +    calc_sine(p, pa_memblock_get_length(u->memblock), frequency); +    pa_memblock_release(u->memblock); +          snprintf(t, sizeof(t), "Sine Generator at %u Hz", frequency);      pa_sink_input_new_data_init(&data); diff --git a/src/modules/rtp/rtp.c b/src/modules/rtp/rtp.c index 3bb0ea47..a4362f84 100644 --- a/src/modules/rtp/rtp.c +++ b/src/modules/rtp/rtp.c @@ -79,7 +79,7 @@ int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q) {              size_t k = n + chunk.length > size ? size - n : chunk.length;              if (chunk.memblock) { -                iov[iov_idx].iov_base = (void*)((uint8_t*) chunk.memblock->data + chunk.index); +                iov[iov_idx].iov_base = (void*)((uint8_t*) pa_memblock_acquire(chunk.memblock) + chunk.index);                  iov[iov_idx].iov_len = k;                  mb[iov_idx] = chunk.memblock;                  iov_idx ++; @@ -114,8 +114,10 @@ int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q) {                  k = sendmsg(c->fd, &m, MSG_DONTWAIT); -                for (i = 1; i < iov_idx; i++) +                for (i = 1; i < iov_idx; i++) { +                    pa_memblock_release(mb[i]);                      pa_memblock_unref(mb[i]); +                }                  c->sequence++;              } else @@ -172,7 +174,7 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) {      chunk->memblock = pa_memblock_new(pool, size); -    iov.iov_base = chunk->memblock->data; +    iov.iov_base = pa_memblock_acquire(chunk->memblock);      iov.iov_len = size;      m.msg_name = NULL; @@ -193,9 +195,9 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) {          goto fail;      } -    memcpy(&header, chunk->memblock->data, sizeof(uint32_t)); -    memcpy(&c->timestamp, (uint8_t*) chunk->memblock->data + 4, sizeof(uint32_t)); -    memcpy(&c->ssrc, (uint8_t*) chunk->memblock->data + 8, sizeof(uint32_t)); +    memcpy(&header, iov.iov_base, sizeof(uint32_t)); +    memcpy(&c->timestamp, (uint8_t*) iov.iov_base + 4, sizeof(uint32_t)); +    memcpy(&c->ssrc, (uint8_t*) iov.iov_base + 8, sizeof(uint32_t));      header = ntohl(header);      c->timestamp = ntohl(c->timestamp); @@ -236,8 +238,10 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) {      return 0;  fail: -    if (chunk->memblock) +    if (chunk->memblock) { +        pa_memblock_release(chunk->memblock);          pa_memblock_unref(chunk->memblock); +    }      return -1;  } diff --git a/src/pulse/internal.h b/src/pulse/internal.h index 4eef4b4a..76d80d83 100644 --- a/src/pulse/internal.h +++ b/src/pulse/internal.h @@ -113,6 +113,7 @@ struct pa_stream {      uint32_t requested_bytes;      pa_memchunk peek_memchunk; +    void *peek_data;      pa_memblockq *record_memblockq;      int corked; diff --git a/src/pulse/stream.c b/src/pulse/stream.c index 180cd096..d31127d8 100644 --- a/src/pulse/stream.c +++ b/src/pulse/stream.c @@ -88,6 +88,7 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *      s->peek_memchunk.index = 0;      s->peek_memchunk.length = 0;      s->peek_memchunk.memblock = NULL; +    s->peek_data = NULL;      s->record_memblockq = NULL; @@ -122,8 +123,11 @@ static void stream_free(pa_stream *s) {          s->mainloop->time_free(s->auto_timing_update_event);      } -    if (s->peek_memchunk.memblock) +    if (s->peek_memchunk.memblock) { +        if (s->peek_data) +            pa_memblock_release(s->peek_memchunk.memblock);          pa_memblock_unref(s->peek_memchunk.memblock); +    }      if (s->record_memblockq)          pa_memblockq_free(s->record_memblockq); @@ -605,8 +609,11 @@ int pa_stream_write(      if (free_cb)           chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) data, length, free_cb, 1);      else { +        void *tdata;          chunk.memblock = pa_memblock_new(s->context->mempool, length); -        memcpy(chunk.memblock->data, data, length); +        tdata = pa_memblock_acquire(chunk.memblock); +        memcpy(tdata, data, length); +        pa_memblock_release(chunk.memblock);      }      chunk.index = 0; @@ -672,9 +679,12 @@ int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {              *length = 0;              return 0;          } + +        s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);      } -    *data = (const char*) s->peek_memchunk.memblock->data + s->peek_memchunk.index; +    assert(s->peek_data); +    *data = (uint8_t*) s->peek_data + s->peek_memchunk.index;      *length = s->peek_memchunk.length;      return 0;  } @@ -692,7 +702,9 @@ int pa_stream_drop(pa_stream *s) {      /* Fix the simulated local read index */      if (s->timing_info_valid && !s->timing_info.read_index_corrupt)          s->timing_info.read_index += s->peek_memchunk.length; -     + +    assert(s->peek_data); +    pa_memblock_release(s->peek_memchunk.memblock);      pa_memblock_unref(s->peek_memchunk.memblock);      s->peek_memchunk.length = 0;      s->peek_memchunk.index = 0; diff --git a/src/pulsecore/cli-command.c b/src/pulsecore/cli-command.c index ae475c3a..d7e4a75c 100644 --- a/src/pulsecore/cli-command.c +++ b/src/pulsecore/cli-command.c @@ -259,20 +259,20 @@ static int pa_cli_command_stat(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, PA_G      stat = pa_mempool_get_stat(c->mempool);      pa_strbuf_printf(buf, "Memory blocks currently allocated: %u, size: %s.\n", -                     (unsigned) AO_load_acquire_read((AO_t*) &stat->n_allocated), -                     pa_bytes_snprint(s, sizeof(s), (size_t) AO_load_acquire_read((AO_t*) &stat->allocated_size))); +                     (unsigned) pa_atomic_load(&stat->n_allocated), +                     pa_bytes_snprint(s, sizeof(s), (size_t) pa_atomic_load(&stat->allocated_size)));      pa_strbuf_printf(buf, "Memory blocks allocated during the whole lifetime: %u, size: %s.\n", -                     (unsigned) AO_load_acquire_read((AO_t*) &stat->n_accumulated), -                     pa_bytes_snprint(s, sizeof(s), (size_t) AO_load_acquire_read((AO_t*) &stat->accumulated_size))); +                     (unsigned) pa_atomic_load(&stat->n_accumulated), +                     pa_bytes_snprint(s, sizeof(s), (size_t) pa_atomic_load(&stat->accumulated_size)));      pa_strbuf_printf(buf, "Memory blocks imported from other processes: %u, size: %s.\n", -                     (unsigned) AO_load_acquire_read((AO_t*) &stat->n_imported), -                     pa_bytes_snprint(s, sizeof(s), (size_t) AO_load_acquire_read((AO_t*) &stat->imported_size))); +                     (unsigned) pa_atomic_load(&stat->n_imported), +                     pa_bytes_snprint(s, sizeof(s), (size_t) pa_atomic_load(&stat->imported_size)));      pa_strbuf_printf(buf, "Memory blocks exported to other processes: %u, size: %s.\n", -                     (unsigned) AO_load_acquire_read((AO_t*) &stat->n_exported), -                     pa_bytes_snprint(s, sizeof(s), (size_t) AO_load_acquire_read((AO_t*) &stat->exported_size))); +                     (unsigned) pa_atomic_load(&stat->n_exported), +                     pa_bytes_snprint(s, sizeof(s), (size_t) pa_atomic_load(&stat->exported_size)));      pa_strbuf_printf(buf, "Total sample cache size: %s.\n",                       pa_bytes_snprint(s, sizeof(s), pa_scache_total_size(c))); @@ -289,8 +289,8 @@ static int pa_cli_command_stat(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, PA_G          pa_strbuf_printf(buf,                           "Memory blocks of type %s: %u allocated/%u accumulated.\n",                           type_table[k], -                         (unsigned) AO_load_acquire_read(&stat->n_allocated_by_type[k]), -                         (unsigned) AO_load_acquire_read(&stat->n_accumulated_by_type[k])); +                         (unsigned) pa_atomic_load(&stat->n_allocated_by_type[k]), +                         (unsigned) pa_atomic_load(&stat->n_accumulated_by_type[k]));      return 0;  } diff --git a/src/pulsecore/mcalign.c b/src/pulsecore/mcalign.c index 9ede610d..aa2eae46 100644 --- a/src/pulsecore/mcalign.c +++ b/src/pulsecore/mcalign.c @@ -89,6 +89,7 @@ void pa_mcalign_push(pa_mcalign *m, const pa_memchunk *c) {          } else {              size_t l; +            void *lo_data, *m_data;              /* We have to copy */              assert(m->leftover.length < m->base); @@ -100,10 +101,15 @@ void pa_mcalign_push(pa_mcalign *m, const pa_memchunk *c) {              /* Can we use the current block? */              pa_memchunk_make_writable(&m->leftover, m->base); -            memcpy((uint8_t*) m->leftover.memblock->data + m->leftover.index + m->leftover.length, (uint8_t*) c->memblock->data + c->index, l); +            lo_data = pa_memblock_acquire(m->leftover.memblock); +            m_data = pa_memblock_acquire(c->memblock); +            memcpy((uint8_t*) lo_data + m->leftover.index + m->leftover.length, (uint8_t*) m_data + c->index, l); +            pa_memblock_release(m->leftover.memblock); +            pa_memblock_release(c->memblock);              m->leftover.length += l; -            assert(m->leftover.length <= m->base && m->leftover.length <= m->leftover.memblock->length); +            assert(m->leftover.length <= m->base); +            assert(m->leftover.length <= pa_memblock_get_length(m->leftover.memblock));              if (c->length > l) {                  /* Save the remainder of the memory block */ diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c index 9cfd79b5..f11a7174 100644 --- a/src/pulsecore/memblock.c +++ b/src/pulsecore/memblock.c @@ -30,10 +30,13 @@  #include <unistd.h>  #include <pulse/xmalloc.h> +#include <pulse/def.h>  #include <pulsecore/shm.h>  #include <pulsecore/log.h>  #include <pulsecore/hashmap.h> +#include <pulsecore/mutex.h> +#include <pulsecore/flist.h>  #include "memblock.h" @@ -45,6 +48,32 @@  #define PA_MEMIMPORT_SLOTS_MAX 128  #define PA_MEMIMPORT_SEGMENTS_MAX 16 +struct pa_memblock { +    PA_REFCNT_DECLARE; /* the reference counter */ +    pa_mempool *pool; + +    pa_memblock_type_t type; +    int read_only; /* boolean */ +     +    pa_atomic_ptr_t data; +    size_t length; + +    pa_atomic_int_t n_acquired; +    pa_atomic_int_t please_signal; + +    union { +        struct { +            /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */ +            pa_free_cb_t free_cb; +        } user; +             +        struct  { +            uint32_t id; +            pa_memimport_segment *segment; +        } imported; +    } per_type; +}; +  struct pa_memimport_segment {      pa_memimport *import;      pa_shm memory; @@ -52,6 +81,8 @@ struct pa_memimport_segment {  };  struct pa_memimport { +    pa_mutex *mutex; +          pa_mempool *pool;      pa_hashmap *segments;      pa_hashmap *blocks; @@ -70,9 +101,11 @@ struct memexport_slot {  };  struct pa_memexport { +    pa_mutex *mutex;      pa_mempool *pool;      struct memexport_slot slots[PA_MEMEXPORT_SLOTS_MAX]; +      PA_LLIST_HEAD(struct memexport_slot, free_slots);      PA_LLIST_HEAD(struct memexport_slot, used_slots);      unsigned n_init; @@ -92,63 +125,71 @@ struct mempool_slot {  };  struct pa_mempool { +    pa_mutex *mutex; +    pa_cond *cond; +          pa_shm memory;      size_t block_size; -    unsigned n_blocks, n_init; +    unsigned n_blocks; + +    pa_atomic_int_t n_init;      PA_LLIST_HEAD(pa_memimport, imports);      PA_LLIST_HEAD(pa_memexport, exports);      /* A list of free slots that may be reused */ -    PA_LLIST_HEAD(struct mempool_slot, free_slots); +    pa_flist *free_slots;      pa_mempool_stat stat;  };  static void segment_detach(pa_memimport_segment *seg); +/* No lock necessary */  static void stat_add(pa_memblock*b) {      assert(b);      assert(b->pool); -    AO_fetch_and_add1_release_write(&b->pool->stat.n_allocated); -    AO_fetch_and_add_release_write(&b->pool->stat.allocated_size, (AO_t) b->length); +    pa_atomic_inc(&b->pool->stat.n_allocated); +    pa_atomic_add(&b->pool->stat.allocated_size, (int) b->length); -    AO_fetch_and_add1_release_write(&b->pool->stat.n_accumulated); -    AO_fetch_and_add_release_write(&b->pool->stat.accumulated_size, (AO_t) b->length); +    pa_atomic_inc(&b->pool->stat.n_accumulated); +    pa_atomic_add(&b->pool->stat.accumulated_size, (int) b->length);      if (b->type == PA_MEMBLOCK_IMPORTED) { -        AO_fetch_and_add1_release_write(&b->pool->stat.n_imported); -        AO_fetch_and_add_release_write(&b->pool->stat.imported_size, (AO_t) b->length); +        pa_atomic_inc(&b->pool->stat.n_imported); +        pa_atomic_add(&b->pool->stat.imported_size, (int) b->length);      } -    AO_fetch_and_add1_release_write(&b->pool->stat.n_allocated_by_type[b->type]); -    AO_fetch_and_add1_release_write(&b->pool->stat.n_accumulated_by_type[b->type]); +    pa_atomic_inc(&b->pool->stat.n_allocated_by_type[b->type]); +    pa_atomic_inc(&b->pool->stat.n_accumulated_by_type[b->type]);  } +/* No lock necessary */  static void stat_remove(pa_memblock *b) {      assert(b);      assert(b->pool); -    assert(AO_load_acquire_read(&b->pool->stat.n_allocated) > 0); -    assert(AO_load_acquire_read(&b->pool->stat.allocated_size) >= (AO_t) b->length); +    assert(pa_atomic_load(&b->pool->stat.n_allocated) > 0); +    assert(pa_atomic_load(&b->pool->stat.allocated_size) >= (int) b->length); -    AO_fetch_and_sub1_release_write(&b->pool->stat.n_allocated); -    AO_fetch_and_add_release_write(&b->pool->stat.allocated_size,  (AO_t) (-b->length)); +    pa_atomic_dec(&b->pool->stat.n_allocated); +    pa_atomic_add(&b->pool->stat.allocated_size, - (int) b->length);      if (b->type == PA_MEMBLOCK_IMPORTED) { -        assert(AO_load_acquire_read(&b->pool->stat.n_imported) > 0); -        assert(AO_load_acquire_read(&b->pool->stat.imported_size) >= (AO_t) b->length); +        assert(pa_atomic_load(&b->pool->stat.n_imported) > 0); +        assert(pa_atomic_load(&b->pool->stat.imported_size) >= (int) b->length); -        AO_fetch_and_sub1_release_write(&b->pool->stat.n_imported); -        AO_fetch_and_add_release_write(&b->pool->stat.imported_size, (AO_t)  (-b->length)); +        pa_atomic_dec(&b->pool->stat.n_imported); +        pa_atomic_add(&b->pool->stat.imported_size, - (int) b->length);      } -    AO_fetch_and_sub1_release_write(&b->pool->stat.n_allocated_by_type[b->type]); +    pa_atomic_dec(&b->pool->stat.n_allocated_by_type[b->type]);  }  static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length); +/* No lock necessary */  pa_memblock *pa_memblock_new(pa_mempool *p, size_t length) {      pa_memblock *b; @@ -161,6 +202,7 @@ pa_memblock *pa_memblock_new(pa_mempool *p, size_t length) {      return b;  } +/* No lock necessary */  static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length) {      pa_memblock *b; @@ -168,49 +210,61 @@ static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length) {      assert(length > 0);      b = pa_xmalloc(sizeof(pa_memblock) + length); +    PA_REFCNT_INIT(b); +    b->pool = p;      b->type = PA_MEMBLOCK_APPENDED;      b->read_only = 0; -    PA_REFCNT_INIT(b); +    pa_atomic_ptr_store(&b->data, (uint8_t*)b + sizeof(pa_memblock));      b->length = length; -    b->data = (uint8_t*) b + sizeof(pa_memblock); -    b->pool = p; - +    pa_atomic_store(&b->n_acquired, 0); +    pa_atomic_store(&b->please_signal, 0); +          stat_add(b);      return b;  } +/* No lock necessary */  static struct mempool_slot* mempool_allocate_slot(pa_mempool *p) {      struct mempool_slot *slot;      assert(p); -    if (p->free_slots) { -        slot = p->free_slots; -        PA_LLIST_REMOVE(struct mempool_slot, p->free_slots, slot); -    } else if (p->n_init < p->n_blocks) -        slot = (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (p->block_size * p->n_init++)); -    else { -        pa_log_debug("Pool full"); -        AO_fetch_and_add1_release_write(&p->stat.n_pool_full); -        return NULL; +    if (!(slot = pa_flist_pop(p->free_slots))) { +        int idx; +         +        /* The free list was empty, we have to allocate a new entry */ + +        if ((unsigned) (idx = pa_atomic_inc(&p->n_init)) >= p->n_blocks) +            pa_atomic_dec(&p->n_init); +        else +            slot = (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (p->block_size * idx)); +         +        if (!slot) { +            pa_log_debug("Pool full"); +            pa_atomic_inc(&p->stat.n_pool_full); +        }      }      return slot;  } +/* No lock necessary */  static void* mempool_slot_data(struct mempool_slot *slot) {      assert(slot);      return (uint8_t*) slot + sizeof(struct mempool_slot);  } +/* No lock necessary */  static unsigned mempool_slot_idx(pa_mempool *p, void *ptr) {      assert(p); +      assert((uint8_t*) ptr >= (uint8_t*) p->memory.ptr);      assert((uint8_t*) ptr < (uint8_t*) p->memory.ptr + p->memory.size);      return ((uint8_t*) ptr - (uint8_t*) p->memory.ptr) / p->block_size;  } +/* No lock necessary */  static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {      unsigned idx; @@ -220,6 +274,7 @@ static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {      return (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (idx * p->block_size));  } +/* No lock necessary */  pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {      pa_memblock *b = NULL;      struct mempool_slot *slot; @@ -234,7 +289,7 @@ pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {          b = mempool_slot_data(slot);          b->type = PA_MEMBLOCK_POOL; -        b->data = (uint8_t*) b + sizeof(pa_memblock); +        pa_atomic_ptr_store(&b->data, (uint8_t*) b + sizeof(pa_memblock));      } else if (p->block_size - sizeof(struct mempool_slot) >= length) { @@ -243,22 +298,26 @@ pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {          b = pa_xnew(pa_memblock, 1);          b->type = PA_MEMBLOCK_POOL_EXTERNAL; -        b->data = mempool_slot_data(slot); +        pa_atomic_ptr_store(&b->data, mempool_slot_data(slot)); +              } else {          pa_log_debug("Memory block too large for pool: %u > %u", length, p->block_size - sizeof(struct mempool_slot)); -        AO_fetch_and_add1_release_write(&p->stat.n_too_large_for_pool); +        pa_atomic_inc(&p->stat.n_too_large_for_pool);          return NULL;      } -    b->length = length; -    b->read_only = 0;      PA_REFCNT_INIT(b);      b->pool = p; +    b->read_only = 0; +    b->length = length; +    pa_atomic_store(&b->n_acquired, 0); +    pa_atomic_store(&b->please_signal, 0);      stat_add(b);      return b;  } +/* No lock necessary */  pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, int read_only) {      pa_memblock *b; @@ -267,17 +326,20 @@ pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, int re      assert(length > 0);      b = pa_xnew(pa_memblock, 1); +    PA_REFCNT_INIT(b); +    b->pool = p;      b->type = PA_MEMBLOCK_FIXED;      b->read_only = read_only; -    PA_REFCNT_INIT(b); +    pa_atomic_ptr_store(&b->data, d);      b->length = length; -    b->data = d; -    b->pool = p; +    pa_atomic_store(&b->n_acquired, 0); +    pa_atomic_store(&b->please_signal, 0);      stat_add(b);      return b;  } +/* No lock necessary */  pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, void (*free_cb)(void *p), int read_only) {      pa_memblock *b; @@ -287,18 +349,72 @@ pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, void (*      assert(free_cb);      b = pa_xnew(pa_memblock, 1); +    PA_REFCNT_INIT(b); +    b->pool = p;      b->type = PA_MEMBLOCK_USER;      b->read_only = read_only; -    PA_REFCNT_INIT(b); +    pa_atomic_ptr_store(&b->data, d);      b->length = length; -    b->data = d; +    pa_atomic_store(&b->n_acquired, 0); +    pa_atomic_store(&b->please_signal, 0); +                  b->per_type.user.free_cb = free_cb; -    b->pool = p;      stat_add(b);      return b;  } +/* No lock necessary */ +int pa_memblock_is_read_only(pa_memblock *b) { +    assert(b); +    assert(PA_REFCNT_VALUE(b) > 0); + +    return b->read_only && PA_REFCNT_VALUE(b) == 1; +} + +/* No lock necessary */ +void* pa_memblock_acquire(pa_memblock *b) { +    assert(b); +    assert(PA_REFCNT_VALUE(b) > 0); + +    pa_atomic_inc(&b->n_acquired); +     +    return pa_atomic_ptr_load(&b->data); +} + +/* No lock necessary, in corner cases locks by its own */ +void pa_memblock_release(pa_memblock *b) { +    int r; +    assert(b); +    assert(PA_REFCNT_VALUE(b) > 0); +     +    r = pa_atomic_dec(&b->n_acquired); +    assert(r >= 1); + +    if (r == 1 && pa_atomic_load(&b->please_signal)) { +        pa_mempool *p = b->pool; +        /* Signal a waiting thread that this memblock is no longer used */ +        pa_mutex_lock(p->mutex); +        pa_cond_signal(p->cond, 1); +        pa_mutex_unlock(p->mutex); +    } +} + +size_t pa_memblock_get_length(pa_memblock *b) { +    assert(b); +    assert(PA_REFCNT_VALUE(b) > 0); + +    return b->length; +} + +pa_mempool* pa_memblock_get_pool(pa_memblock *b) { +    assert(b); +    assert(PA_REFCNT_VALUE(b) > 0); + +    return b->pool; +} + +/* No lock necessary */  pa_memblock* pa_memblock_ref(pa_memblock*b) {      assert(b);      assert(PA_REFCNT_VALUE(b) > 0); @@ -307,19 +423,17 @@ pa_memblock* pa_memblock_ref(pa_memblock*b) {      return b;  } -void pa_memblock_unref(pa_memblock*b) { +static void memblock_free(pa_memblock *b) {      assert(b); -    assert(PA_REFCNT_VALUE(b) > 0); - -    if (PA_REFCNT_DEC(b) > 0) -        return; +    assert(pa_atomic_load(&b->n_acquired) == 0); +      stat_remove(b);      switch (b->type) {          case PA_MEMBLOCK_USER :              assert(b->per_type.user.free_cb); -            b->per_type.user.free_cb(b->data); +            b->per_type.user.free_cb(pa_atomic_ptr_load(&b->data));              /* Fall through */ @@ -330,17 +444,23 @@ void pa_memblock_unref(pa_memblock*b) {          case PA_MEMBLOCK_IMPORTED : {              pa_memimport_segment *segment; - +            pa_memimport *import; +             +            /* FIXME! This should be implemented lock-free */ +                          segment = b->per_type.imported.segment;              assert(segment); -            assert(segment->import); +            import = segment->import; +            assert(import); -            pa_hashmap_remove(segment->import->blocks, PA_UINT32_TO_PTR(b->per_type.imported.id)); -            segment->import->release_cb(segment->import, b->per_type.imported.id, segment->import->userdata); - +            pa_mutex_lock(import->mutex); +            pa_hashmap_remove(import->blocks, PA_UINT32_TO_PTR(b->per_type.imported.id));              if (-- segment->n_blocks <= 0)                  segment_detach(segment); -             +            pa_mutex_unlock(import->mutex); + +            import->release_cb(import, b->per_type.imported.id, import->userdata); +              pa_xfree(b);              break;          } @@ -348,13 +468,20 @@ void pa_memblock_unref(pa_memblock*b) {          case PA_MEMBLOCK_POOL_EXTERNAL:          case PA_MEMBLOCK_POOL: {              struct mempool_slot *slot; +            int call_free; -            slot = mempool_slot_by_ptr(b->pool, b->data); +            slot = mempool_slot_by_ptr(b->pool, pa_atomic_ptr_load(&b->data));              assert(slot); + +            call_free = b->type == PA_MEMBLOCK_POOL_EXTERNAL; + +            /* The free list dimensions should easily allow all slots +             * to fit in, hence try harder if pushing this slot into +             * the free list fails */ +            while (pa_flist_push(b->pool->free_slots, slot) < 0) +                ; -            PA_LLIST_PREPEND(struct mempool_slot, b->pool->free_slots, slot); -             -            if (b->type == PA_MEMBLOCK_POOL_EXTERNAL) +            if (call_free)                  pa_xfree(b);              break; @@ -366,10 +493,42 @@ void pa_memblock_unref(pa_memblock*b) {      }  } +/* No lock necessary */ +void pa_memblock_unref(pa_memblock*b) { +    assert(b); +    assert(PA_REFCNT_VALUE(b) > 0); + +    if (PA_REFCNT_DEC(b) > 0) +        return; + +    memblock_free(b); +} + +/* Self locked */ +static void memblock_wait(pa_memblock *b) { +    assert(b); + +    if (pa_atomic_load(&b->n_acquired) > 0) { +        /* We need to wait until all threads gave up access to the +         * memory block before we can go on. Unfortunately this means +         * that we have to lock and wait here. Sniff! */ + +        pa_atomic_inc(&b->please_signal); + +        pa_mutex_lock(b->pool->mutex); +        while (pa_atomic_load(&b->n_acquired) > 0) +            pa_cond_wait(b->pool->cond, b->pool->mutex); +        pa_mutex_unlock(b->pool->mutex); + +        pa_atomic_dec(&b->please_signal); +    } +} + +/* No lock necessary. This function is not multiple caller safe! */  static void memblock_make_local(pa_memblock *b) {      assert(b); -    AO_fetch_and_sub1_release_write(&b->pool->stat.n_allocated_by_type[b->type]); +    pa_atomic_dec(&b->pool->stat.n_allocated_by_type[b->type]);      if (b->length <= b->pool->block_size - sizeof(struct mempool_slot)) {          struct mempool_slot *slot; @@ -378,53 +537,61 @@ static void memblock_make_local(pa_memblock *b) {              void *new_data;              /* We can move it into a local pool, perfect! */ +            new_data = mempool_slot_data(slot); +            memcpy(new_data, pa_atomic_ptr_load(&b->data), b->length); +            pa_atomic_ptr_store(&b->data, new_data); +              b->type = PA_MEMBLOCK_POOL_EXTERNAL;              b->read_only = 0; -            new_data = mempool_slot_data(slot); -            memcpy(new_data, b->data, b->length); -            b->data = new_data;              goto finish;          }      }      /* Humm, not enough space in the pool, so lets allocate the memory with malloc() */ -    b->type = PA_MEMBLOCK_USER;      b->per_type.user.free_cb = pa_xfree; +    pa_atomic_ptr_store(&b->data, pa_xmemdup(pa_atomic_ptr_load(&b->data), b->length)); + +    b->type = PA_MEMBLOCK_USER;      b->read_only = 0; -    b->data = pa_xmemdup(b->data, b->length);  finish: -    AO_fetch_and_add1_release_write(&b->pool->stat.n_allocated_by_type[b->type]); -    AO_fetch_and_add1_release_write(&b->pool->stat.n_accumulated_by_type[b->type]); +    pa_atomic_inc(&b->pool->stat.n_allocated_by_type[b->type]); +    pa_atomic_inc(&b->pool->stat.n_accumulated_by_type[b->type]); + +    memblock_wait(b);  } +/* No lock necessary. This function is not multiple caller safe*/  void pa_memblock_unref_fixed(pa_memblock *b) {      assert(b);      assert(PA_REFCNT_VALUE(b) > 0);      assert(b->type == PA_MEMBLOCK_FIXED); -    if (PA_REFCNT_VALUE(b) > 1) +    if (PA_REFCNT_DEC(b) > 0)          memblock_make_local(b); - -    pa_memblock_unref(b); +    else +        memblock_free(b);  } +/* Self-locked. This function is not multiple-caller safe */  static void memblock_replace_import(pa_memblock *b) {      pa_memimport_segment *seg;      assert(b);      assert(b->type == PA_MEMBLOCK_IMPORTED); -    assert(AO_load_acquire_read(&b->pool->stat.n_imported) > 0); -    assert(AO_load_acquire_read(&b->pool->stat.imported_size) >= (AO_t) b->length); -    AO_fetch_and_sub1_release_write(&b->pool->stat.n_imported); -    AO_fetch_and_add_release_write(&b->pool->stat.imported_size, (AO_t) - b->length); +    assert(pa_atomic_load(&b->pool->stat.n_imported) > 0); +    assert(pa_atomic_load(&b->pool->stat.imported_size) >= (int) b->length); +    pa_atomic_dec(&b->pool->stat.n_imported); +    pa_atomic_add(&b->pool->stat.imported_size, (int) - b->length);      seg = b->per_type.imported.segment;      assert(seg);      assert(seg->import); +    pa_mutex_lock(seg->import->mutex); +          pa_hashmap_remove(              seg->import->blocks,              PA_UINT32_TO_PTR(b->per_type.imported.id)); @@ -433,6 +600,8 @@ static void memblock_replace_import(pa_memblock *b) {      if (-- seg->n_blocks <= 0)          segment_detach(seg); + +    pa_mutex_unlock(seg->import->mutex);  }  pa_mempool* pa_mempool_new(int shared) { @@ -441,12 +610,15 @@ pa_mempool* pa_mempool_new(int shared) {      p = pa_xnew(pa_mempool, 1); +    p->mutex = pa_mutex_new(1); +    p->cond = pa_cond_new(); +  #ifdef HAVE_SYSCONF      ps = (size_t) sysconf(_SC_PAGESIZE);  #elif defined(PAGE_SIZE) -	ps = (size_t) PAGE_SIZE; +    ps = (size_t) PAGE_SIZE;  #else -	ps = 4096; /* Let's hope it's like x86. */ +    ps = 4096; /* Let's hope it's like x86. */  #endif      p->block_size = (PA_MEMPOOL_SLOT_SIZE/ps)*ps; @@ -463,13 +635,13 @@ pa_mempool* pa_mempool_new(int shared) {          return NULL;      } -    p->n_init = 0; +    memset(&p->stat, 0, sizeof(p->stat)); +    pa_atomic_store(&p->n_init, 0);      PA_LLIST_HEAD_INIT(pa_memimport, p->imports);      PA_LLIST_HEAD_INIT(pa_memexport, p->exports); -    PA_LLIST_HEAD_INIT(struct mempool_slot, p->free_slots); -    memset(&p->stat, 0, sizeof(p->stat)); +    p->free_slots = pa_flist_new(p->n_blocks*2);      return p;  } @@ -477,34 +649,62 @@ pa_mempool* pa_mempool_new(int shared) {  void pa_mempool_free(pa_mempool *p) {      assert(p); +    pa_mutex_lock(p->mutex); +      while (p->imports)          pa_memimport_free(p->imports);      while (p->exports)          pa_memexport_free(p->exports); -    if (AO_load_acquire_read(&p->stat.n_allocated) > 0) +    pa_mutex_unlock(p->mutex); + +    if (pa_atomic_load(&p->stat.n_allocated) > 0)          pa_log_warn("WARNING! Memory pool destroyed but not all memory blocks freed!"); + +    pa_flist_free(p->free_slots, NULL);      pa_shm_free(&p->memory); + +    pa_mutex_free(p->mutex); +    pa_cond_free(p->cond); +          pa_xfree(p);  } +/* No lock necessary */  const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p) {      assert(p);      return &p->stat;  } +/* No lock necessary */  void pa_mempool_vacuum(pa_mempool *p) {      struct mempool_slot *slot; +    pa_flist *list;      assert(p); -    for (slot = p->free_slots; slot; slot = slot->next) -        pa_shm_punch(&p->memory, (uint8_t*) slot + sizeof(struct mempool_slot) - (uint8_t*) p->memory.ptr, p->block_size - sizeof(struct mempool_slot)); +    list = pa_flist_new(p->n_blocks*2); + +    while ((slot = pa_flist_pop(p->free_slots))) +        while (pa_flist_push(list, slot) < 0) +            ; + +    while ((slot = pa_flist_pop(list))) { +        pa_shm_punch(&p->memory, +                     (uint8_t*) slot - (uint8_t*) p->memory.ptr + sizeof(struct mempool_slot), +                     p->block_size - sizeof(struct mempool_slot)); + +        while (pa_flist_push(p->free_slots, slot)) +            ; +    } + +    pa_flist_free(list, NULL);  } +/* No lock necessary */  int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id) {      assert(p); @@ -516,6 +716,7 @@ int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id) {      return 0;  } +/* No lock necessary */  int pa_mempool_is_shared(pa_mempool *p) {      assert(p); @@ -530,18 +731,23 @@ pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void      assert(cb);      i = pa_xnew(pa_memimport, 1); +    i->mutex = pa_mutex_new(0);      i->pool = p;      i->segments = pa_hashmap_new(NULL, NULL);      i->blocks = pa_hashmap_new(NULL, NULL);      i->release_cb = cb;      i->userdata = userdata; -     + +    pa_mutex_lock(p->mutex);      PA_LLIST_PREPEND(pa_memimport, p->imports, i); +    pa_mutex_unlock(p->mutex); +      return i;  }  static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i); +/* Should be called locked */  static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {      pa_memimport_segment* seg; @@ -562,6 +768,7 @@ static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {      return seg;  } +/* Should be called locked */  static void segment_detach(pa_memimport_segment *seg) {      assert(seg); @@ -570,51 +777,68 @@ static void segment_detach(pa_memimport_segment *seg) {      pa_xfree(seg);  } +/* Self-locked. Not multiple-caller safe */  void pa_memimport_free(pa_memimport *i) {      pa_memexport *e;      pa_memblock *b;      assert(i); -    /* If we've exported this block further we need to revoke that export */ -    for (e = i->pool->exports; e; e = e->next) -        memexport_revoke_blocks(e, i); +    pa_mutex_lock(i->mutex);      while ((b = pa_hashmap_get_first(i->blocks)))          memblock_replace_import(b);      assert(pa_hashmap_size(i->segments) == 0); +     +    pa_mutex_unlock(i->mutex); + +    pa_mutex_lock(i->pool->mutex); + +    /* If we've exported this block further we need to revoke that export */ +    for (e = i->pool->exports; e; e = e->next) +        memexport_revoke_blocks(e, i); +    PA_LLIST_REMOVE(pa_memimport, i->pool->imports, i); +     +    pa_mutex_unlock(i->pool->mutex); +          pa_hashmap_free(i->blocks, NULL, NULL);      pa_hashmap_free(i->segments, NULL, NULL); + +    pa_mutex_free(i->mutex); -    PA_LLIST_REMOVE(pa_memimport, i->pool->imports, i);      pa_xfree(i);  } +/* Self-locked */  pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) { -    pa_memblock *b; +    pa_memblock *b = NULL;      pa_memimport_segment *seg;      assert(i); +    pa_mutex_lock(i->mutex); +          if (pa_hashmap_size(i->blocks) >= PA_MEMIMPORT_SLOTS_MAX) -        return NULL; +        goto finish;      if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id))))           if (!(seg = segment_attach(i, shm_id))) -            return NULL; +            goto finish;      if (offset+size > seg->memory.size) -        return NULL; -     +        goto finish; +      b = pa_xnew(pa_memblock, 1); +    PA_REFCNT_INIT(b); +    b->pool = i->pool;      b->type = PA_MEMBLOCK_IMPORTED;      b->read_only = 1; -    PA_REFCNT_INIT(b); +    pa_atomic_ptr_store(&b->data, (uint8_t*) seg->memory.ptr + offset);      b->length = size; -    b->data = (uint8_t*) seg->memory.ptr + offset; -    b->pool = i->pool; +    pa_atomic_store(&b->n_acquired, 0); +    pa_atomic_store(&b->please_signal, 0);      b->per_type.imported.id = block_id;      b->per_type.imported.segment = seg; @@ -622,7 +846,11 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i      seg->n_blocks++; -    stat_add(b); +finish: +    pa_mutex_unlock(i->mutex); + +    if (b) +        stat_add(b);      return b;  } @@ -631,10 +859,15 @@ int pa_memimport_process_revoke(pa_memimport *i, uint32_t id) {      pa_memblock *b;      assert(i); +    pa_mutex_lock(i->mutex); +      if (!(b = pa_hashmap_get(i->blocks, PA_UINT32_TO_PTR(id))))          return -1;      memblock_replace_import(b); + +    pa_mutex_unlock(i->mutex); +      return 0;  } @@ -649,58 +882,84 @@ pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void          return NULL;      e = pa_xnew(pa_memexport, 1); +    e->mutex = pa_mutex_new(1);      e->pool = p;      PA_LLIST_HEAD_INIT(struct memexport_slot, e->free_slots);      PA_LLIST_HEAD_INIT(struct memexport_slot, e->used_slots);      e->n_init = 0;      e->revoke_cb = cb;      e->userdata = userdata; -     + +    pa_mutex_lock(p->mutex);      PA_LLIST_PREPEND(pa_memexport, p->exports, e); +    pa_mutex_unlock(p->mutex); +          return e;  }  void pa_memexport_free(pa_memexport *e) {      assert(e); +    pa_mutex_lock(e->mutex);      while (e->used_slots)          pa_memexport_process_release(e, e->used_slots - e->slots); +    pa_mutex_unlock(e->mutex); +    pa_mutex_lock(e->pool->mutex);      PA_LLIST_REMOVE(pa_memexport, e->pool->exports, e); +    pa_mutex_unlock(e->pool->mutex); +          pa_xfree(e);  } +/* Self-locked */  int pa_memexport_process_release(pa_memexport *e, uint32_t id) { +    pa_memblock *b; +          assert(e); +    pa_mutex_lock(e->mutex); +          if (id >= e->n_init) -        return -1; +        goto fail;      if (!e->slots[id].block) -        return -1; +        goto fail; -/*     pa_log("Processing release for %u", id); */ - -    assert(AO_load_acquire_read(&e->pool->stat.n_exported) > 0); -    assert(AO_load_acquire_read(&e->pool->stat.exported_size) >= (AO_t) e->slots[id].block->length); -     -    AO_fetch_and_sub1_release_write(&e->pool->stat.n_exported); -    AO_fetch_and_add_release_write(&e->pool->stat.exported_size, (AO_t) -e->slots[id].block->length); -     -    pa_memblock_unref(e->slots[id].block); +    b = e->slots[id].block;      e->slots[id].block = NULL;      PA_LLIST_REMOVE(struct memexport_slot, e->used_slots, &e->slots[id]);      PA_LLIST_PREPEND(struct memexport_slot, e->free_slots, &e->slots[id]); +    pa_mutex_unlock(e->mutex); +     +/*     pa_log("Processing release for %u", id); */ + +    assert(pa_atomic_load(&e->pool->stat.n_exported) > 0); +    assert(pa_atomic_load(&e->pool->stat.exported_size) >= (int) b->length); +     +    pa_atomic_dec(&e->pool->stat.n_exported); +    pa_atomic_add(&e->pool->stat.exported_size, (int) -b->length); +     +    pa_memblock_unref(b); +      return 0; + +fail: +    pa_mutex_unlock(e->mutex); +     +    return -1;  } +/* Self-locked */  static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i) {      struct memexport_slot *slot, *next;      assert(e);      assert(i); +    pa_mutex_lock(e->mutex); +      for (slot = e->used_slots; slot; slot = next) {          uint32_t idx;          next = slot->next; @@ -713,8 +972,11 @@ static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i) {          e->revoke_cb(e, idx, e->userdata);          pa_memexport_process_release(e, idx);      } + +    pa_mutex_unlock(e->mutex);  } +/* No lock necessary */  static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) {      pa_memblock *n; @@ -731,13 +993,16 @@ static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) {      if (!(n = pa_memblock_new_pool(p, b->length)))          return NULL; -    memcpy(n->data, b->data, b->length); +    memcpy(pa_atomic_ptr_load(&n->data), pa_atomic_ptr_load(&b->data), b->length);      return n;  } +/* Self-locked */  int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) {      pa_shm *memory;      struct memexport_slot *slot; +    void *data; +    size_t length;      assert(e);      assert(b); @@ -750,12 +1015,15 @@ int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32      if (!(b = memblock_shared_copy(e->pool, b)))          return -1; +    pa_mutex_lock(e->mutex); +          if (e->free_slots) {          slot = e->free_slots;          PA_LLIST_REMOVE(struct memexport_slot, e->free_slots, slot); -    } else if (e->n_init < PA_MEMEXPORT_SLOTS_MAX) { +    } else if (e->n_init < PA_MEMEXPORT_SLOTS_MAX)           slot = &e->slots[e->n_init++]; -    } else { +    else { +        pa_mutex_unlock(e->mutex);          pa_memblock_unref(b);          return -1;      } @@ -764,8 +1032,11 @@ int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32      slot->block = b;      *block_id = slot - e->slots; +    pa_mutex_unlock(e->mutex);  /*     pa_log("Got block id %u", *block_id); */ +    data = pa_memblock_acquire(b); +      if (b->type == PA_MEMBLOCK_IMPORTED) {          assert(b->per_type.imported.segment);          memory = &b->per_type.imported.segment->memory; @@ -775,15 +1046,17 @@ int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32          memory = &b->pool->memory;      } -    assert(b->data >= memory->ptr); -    assert((uint8_t*) b->data + b->length <= (uint8_t*) memory->ptr + memory->size); +    assert(data >= memory->ptr); +    assert((uint8_t*) data + length <= (uint8_t*) memory->ptr + memory->size);      *shm_id = memory->id; -    *offset = (uint8_t*) b->data - (uint8_t*) memory->ptr; -    *size = b->length; +    *offset = (uint8_t*) data - (uint8_t*) memory->ptr; +    *size = length; -    AO_fetch_and_add1_release_write(&e->pool->stat.n_exported); -    AO_fetch_and_add_release_write(&e->pool->stat.exported_size, (AO_t) b->length); +    pa_memblock_release(b); +     +    pa_atomic_inc(&e->pool->stat.n_exported); +    pa_atomic_add(&e->pool->stat.exported_size, (int) length);      return 0;  } diff --git a/src/pulsecore/memblock.h b/src/pulsecore/memblock.h index d4f2b7aa..9937818f 100644 --- a/src/pulsecore/memblock.h +++ b/src/pulsecore/memblock.h @@ -25,6 +25,7 @@  #include <sys/types.h>  #include <inttypes.h> +#include <pulse/def.h>  #include <pulsecore/llist.h>  #include <pulsecore/refcnt.h> @@ -54,45 +55,25 @@ typedef struct pa_memexport pa_memexport;  typedef void (*pa_memimport_release_cb_t)(pa_memimport *i, uint32_t block_id, void *userdata);  typedef void (*pa_memexport_revoke_cb_t)(pa_memexport *e, uint32_t block_id, void *userdata); -struct pa_memblock { -    pa_memblock_type_t type; -    int read_only; /* boolean */ -    PA_REFCNT_DECLARE; /* the reference counter */ -    size_t length; -    void *data; -    pa_mempool *pool; - -    union { -        struct { -            void (*free_cb)(void *p);  /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */ -        } user; -             -        struct  { -            uint32_t id; -            pa_memimport_segment *segment; -        } imported; -    } per_type; -}; -  /* Please note that updates to this structure are not locked,   * i.e. n_allocated might be updated at a point in time where   * n_accumulated is not yet. Take these values with a grain of salt, - * threy are here for purely statistical reasons.*/ + * they are here for purely statistical reasons.*/  struct pa_mempool_stat { -    AO_t n_allocated; -    AO_t n_accumulated; -    AO_t n_imported; -    AO_t n_exported; -    AO_t allocated_size; -    AO_t accumulated_size; -    AO_t imported_size; -    AO_t exported_size; - -    AO_t n_too_large_for_pool; -    AO_t n_pool_full; - -    AO_t n_allocated_by_type[PA_MEMBLOCK_TYPE_MAX]; -    AO_t n_accumulated_by_type[PA_MEMBLOCK_TYPE_MAX]; +    pa_atomic_int_t n_allocated; +    pa_atomic_int_t n_accumulated; +    pa_atomic_int_t n_imported; +    pa_atomic_int_t n_exported; +    pa_atomic_int_t allocated_size; +    pa_atomic_int_t accumulated_size; +    pa_atomic_int_t imported_size; +    pa_atomic_int_t exported_size; + +    pa_atomic_int_t n_too_large_for_pool; +    pa_atomic_int_t n_pool_full; + +    pa_atomic_int_t n_allocated_by_type[PA_MEMBLOCK_TYPE_MAX]; +    pa_atomic_int_t n_accumulated_by_type[PA_MEMBLOCK_TYPE_MAX];  };  /* Allocate a new memory block of type PA_MEMBLOCK_MEMPOOL or PA_MEMBLOCK_APPENDED, depending on the size */ @@ -116,9 +97,17 @@ pa_memblock* pa_memblock_ref(pa_memblock*b);  /* This special unref function has to be called by the owner of the  memory of a static memory block when he wants to release all  references to the memory. This causes the memory to be copied and -converted into a PA_MEMBLOCK_DYNAMIC type memory block */ +converted into a pool or malloc'ed memory block. Please note that this +function is not multiple caller safe, i.e. needs to be locked +manually if called from more than one thread at the same time.  */  void pa_memblock_unref_fixed(pa_memblock*b); +int pa_memblock_is_read_only(pa_memblock *b); +void* pa_memblock_acquire(pa_memblock *b); +void pa_memblock_release(pa_memblock *b); +size_t pa_memblock_get_length(pa_memblock *b); +pa_mempool * pa_memblock_get_pool(pa_memblock *b); +  /* The memory block manager */  pa_mempool* pa_mempool_new(int shared);  void pa_mempool_free(pa_mempool *p); diff --git a/src/pulsecore/memblockq.c b/src/pulsecore/memblockq.c index e6b73fc5..dab44dc3 100644 --- a/src/pulsecore/memblockq.c +++ b/src/pulsecore/memblockq.c @@ -176,7 +176,7 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) {      assert(uchunk);      assert(uchunk->memblock);      assert(uchunk->length > 0); -    assert(uchunk->index + uchunk->length <= uchunk->memblock->length); +    assert(uchunk->index + uchunk->length <= pa_memblock_get_length(uchunk->memblock));      if (uchunk->length % bq->base)          return -1; @@ -360,8 +360,8 @@ int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) {          if (bq->silence) {              chunk->memblock = pa_memblock_ref(bq->silence); -            if (!length || length > chunk->memblock->length) -                length = chunk->memblock->length; +            if (!length || length > pa_memblock_get_length(chunk->memblock)) +                length = pa_memblock_get_length(chunk->memblock);              chunk->length = length;          } else { @@ -413,8 +413,8 @@ void pa_memblockq_drop(pa_memblockq *bq, const pa_memchunk *chunk, size_t length              if (bq->silence) { -                if (!l || l > bq->silence->length) -                    l = bq->silence->length; +                if (!l || l > pa_memblock_get_length(bq->silence)) +                    l = pa_memblock_get_length(bq->silence);              } diff --git a/src/pulsecore/memchunk.c b/src/pulsecore/memchunk.c index 1dbad2b9..55c4bfa7 100644 --- a/src/pulsecore/memchunk.c +++ b/src/pulsecore/memchunk.c @@ -35,22 +35,25 @@  void pa_memchunk_make_writable(pa_memchunk *c, size_t min) {      pa_memblock *n;      size_t l; +    void *tdata, *sdata;      assert(c);      assert(c->memblock); -    assert(PA_REFCNT_VALUE(c->memblock) > 0); -    if (PA_REFCNT_VALUE(c->memblock) == 1 && -        !c->memblock->read_only && -        c->memblock->length >= c->index+min) +    if (pa_memblock_is_read_only(c->memblock) && +        pa_memblock_get_length(c->memblock) >= c->index+min)          return;      l = c->length;      if (l < min)          l = min; -    n = pa_memblock_new(c->memblock->pool, l); -    memcpy(n->data, (uint8_t*) c->memblock->data + c->index, c->length); +    n = pa_memblock_new(pa_memblock_get_pool(c->memblock), l); +    tdata = pa_memblock_acquire(n); +    sdata = pa_memblock_acquire(c->memblock); +    memcpy(tdata, (uint8_t*) sdata + c->index, c->length); +    pa_memblock_release(n); +    pa_memblock_release(c->memblock);      pa_memblock_unref(c->memblock);      c->memblock = n;      c->index = 0; diff --git a/src/pulsecore/play-memchunk.c b/src/pulsecore/play-memchunk.c index cde6a9ee..b711c98c 100644 --- a/src/pulsecore/play-memchunk.c +++ b/src/pulsecore/play-memchunk.c @@ -55,7 +55,7 @@ static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) {      if (c->length <= 0)          return -1; -    assert(c->memblock && c->memblock->length); +    assert(c->memblock);      *chunk = *c;      pa_memblock_ref(c->memblock); diff --git a/src/pulsecore/protocol-esound.c b/src/pulsecore/protocol-esound.c index 80aeb27b..65b93eb4 100644 --- a/src/pulsecore/protocol-esound.c +++ b/src/pulsecore/protocol-esound.c @@ -891,14 +891,22 @@ static int do_read(struct connection *c) {          }      } else if (c->state == ESD_CACHING_SAMPLE) {          ssize_t r; +        void *p; -        assert(c->scache.memchunk.memblock && c->scache.name && c->scache.memchunk.index < c->scache.memchunk.length); +        assert(c->scache.memchunk.memblock); +        assert(c->scache.name); +        assert(c->scache.memchunk.index < c->scache.memchunk.length); + +        p = pa_memblock_acquire(c->scache.memchunk.memblock); -        if ((r = pa_iochannel_read(c->io, (uint8_t*) c->scache.memchunk.memblock->data+c->scache.memchunk.index, c->scache.memchunk.length-c->scache.memchunk.index)) <= 0) { +        if ((r = pa_iochannel_read(c->io, (uint8_t*) p+c->scache.memchunk.index, c->scache.memchunk.length-c->scache.memchunk.index)) <= 0) { +            pa_memblock_release(c->scache.memchunk.memblock);              pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");              return -1;          } +        pa_memblock_release(c->scache.memchunk.memblock); +                  c->scache.memchunk.index += r;          assert(c->scache.memchunk.index <= c->scache.memchunk.length); @@ -925,6 +933,7 @@ static int do_read(struct connection *c) {          pa_memchunk chunk;          ssize_t r;          size_t l; +        void *p;          assert(c->input_memblockq); @@ -937,7 +946,7 @@ static int do_read(struct connection *c) {              l = c->playback.fragment_size;          if (c->playback.current_memblock)  -            if (c->playback.current_memblock->length - c->playback.memblock_index < l) { +            if (pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index < l) {                  pa_memblock_unref(c->playback.current_memblock);                  c->playback.current_memblock = NULL;                  c->playback.memblock_index = 0; @@ -945,15 +954,21 @@ static int do_read(struct connection *c) {          if (!c->playback.current_memblock) {              c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2); -            assert(c->playback.current_memblock && c->playback.current_memblock->length >= l); +            assert(c->playback.current_memblock); +            assert(pa_memblock_get_length(c->playback.current_memblock) >= l);              c->playback.memblock_index = 0;          } -        if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) { +        p = pa_memblock_acquire(c->playback.current_memblock); +         +        if ((r = pa_iochannel_read(c->io, (uint8_t*) p+c->playback.memblock_index, l)) <= 0) { +            pa_memblock_release(c->playback.current_memblock);              pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");              return -1;          } -         + +        pa_memblock_release(c->playback.current_memblock); +          chunk.memblock = c->playback.current_memblock;          chunk.index = c->playback.memblock_index;          chunk.length = r; @@ -990,19 +1005,26 @@ static int do_write(struct connection *c) {      } else if (c->state == ESD_STREAMING_DATA && c->source_output) {          pa_memchunk chunk;          ssize_t r; +        void *p;          assert(c->output_memblockq);          if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0)              return 0; -        assert(chunk.memblock && chunk.length); +        assert(chunk.memblock); +        assert(chunk.length); + +        p = pa_memblock_acquire(chunk.memblock); -        if ((r = pa_iochannel_write(c->io, (uint8_t*) chunk.memblock->data+chunk.index, chunk.length)) < 0) { +        if ((r = pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chunk.length)) < 0) { +            pa_memblock_release(chunk.memblock);              pa_memblock_unref(chunk.memblock);              pa_log("write(): %s", pa_cstrerror(errno));              return -1;          } +        pa_memblock_release(chunk.memblock); +          pa_memblockq_drop(c->output_memblockq, &chunk, r);          pa_memblock_unref(chunk.memblock); diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c index 38c024b7..fba611d7 100644 --- a/src/pulsecore/protocol-native.c +++ b/src/pulsecore/protocol-native.c @@ -2274,6 +2274,7 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o      } else {          struct upload_stream *u = (struct upload_stream*) stream;          size_t l; +                  assert(u->type == UPLOAD_STREAM);          if (!u->memchunk.memblock) { @@ -2293,9 +2294,18 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o          if (l > chunk->length)              l = chunk->length; +                  if (l > 0) { -            memcpy((uint8_t*) u->memchunk.memblock->data + u->memchunk.index + u->memchunk.length, -                   (uint8_t*) chunk->memblock->data+chunk->index, l); +            void *src, *dst; +            dst = pa_memblock_acquire(u->memchunk.memblock); +            src = pa_memblock_acquire(chunk->memblock); +             +            memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length, +                   (uint8_t*) src+chunk->index, l); + +            pa_memblock_release(u->memchunk.memblock); +            pa_memblock_release(chunk->memblock); +                          u->memchunk.length += l;              u->length -= l;          } diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index 6bfba875..bf203e42 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -113,6 +113,7 @@ static int do_read(struct connection *c) {      pa_memchunk chunk;      ssize_t r;      size_t l; +    void *p;      if (!c->sink_input || !(l = pa_memblockq_missing(c->input_memblockq)))          return 0; @@ -121,7 +122,7 @@ static int do_read(struct connection *c) {          l = c->playback.fragment_size;      if (c->playback.current_memblock)  -        if (c->playback.current_memblock->length - c->playback.memblock_index < l) { +        if (pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index < l) {              pa_memblock_unref(c->playback.current_memblock);              c->playback.current_memblock = NULL;              c->playback.memblock_index = 0; @@ -129,15 +130,20 @@ static int do_read(struct connection *c) {      if (!c->playback.current_memblock) {          c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2); -        assert(c->playback.current_memblock && c->playback.current_memblock->length >= l); +        assert(c->playback.current_memblock); +        assert(pa_memblock_get_length(c->playback.current_memblock) >= l);          c->playback.memblock_index = 0;      } + +    p = pa_memblock_acquire(c->playback.current_memblock); -    if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) { +    if ((r = pa_iochannel_read(c->io, (uint8_t*) p + c->playback.memblock_index, l)) <= 0) { +        pa_memblock_release(c->playback.current_memblock);          pa_log_debug("read(): %s", r == 0 ? "EOF" : pa_cstrerror(errno));          return -1;      } +    pa_memblock_release(c->playback.current_memblock);      chunk.memblock = c->playback.current_memblock;      chunk.index = c->playback.memblock_index;      chunk.length = r; @@ -156,7 +162,8 @@ static int do_read(struct connection *c) {  static int do_write(struct connection *c) {      pa_memchunk chunk;      ssize_t r; - +    void *p; +          if (!c->source_output)          return 0;     @@ -165,12 +172,17 @@ static int do_write(struct connection *c) {          return 0;      assert(chunk.memblock && chunk.length); + +    p = pa_memblock_acquire(chunk.memblock); -    if ((r = pa_iochannel_write(c->io, (uint8_t*) chunk.memblock->data+chunk.index, chunk.length)) < 0) { +    if ((r = pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chunk.length)) < 0) { +        pa_memblock_release(chunk.memblock);          pa_memblock_unref(chunk.memblock);          pa_log("write(): %s", pa_cstrerror(errno));          return -1;      } + +    pa_memblock_release(chunk.memblock);      pa_memblockq_drop(c->output_memblockq, &chunk, r);      pa_memblock_unref(chunk.memblock); diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c index 566fb060..33963796 100644 --- a/src/pulsecore/pstream.c +++ b/src/pulsecore/pstream.c @@ -48,6 +48,7 @@  #include <pulsecore/creds.h>  #include <pulsecore/mutex.h>  #include <pulsecore/refcnt.h> +#include <pulsecore/anotify.h>  #include "pstream.h" @@ -113,10 +114,11 @@ struct pa_pstream {      PA_REFCNT_DECLARE;      pa_mainloop_api *mainloop; -    pa_defer_event *defer_event;      pa_iochannel *io; +      pa_queue *send_queue; -    pa_mutex *mutex; +    pa_mutex *mutex;  /* only for access to the queue */ +    pa_anotify *anotify;      int dead; @@ -126,6 +128,7 @@ struct pa_pstream {          uint32_t shm_info[PA_PSTREAM_SHM_MAX];          void *data;          size_t index; +        pa_memchunk memchunk;      } write;      struct { @@ -170,10 +173,6 @@ static void do_something(pa_pstream *p) {      pa_pstream_ref(p); -    pa_mutex_lock(p->mutex); -     -    p->mainloop->defer_enable(p->defer_event, 0); -      if (!p->dead && pa_iochannel_is_readable(p->io)) {          if (do_read(p) < 0)              goto fail; @@ -185,8 +184,6 @@ static void do_something(pa_pstream *p) {              goto fail;      } -    pa_mutex_unlock(p->mutex); -      pa_pstream_unref(p);      return; @@ -197,8 +194,6 @@ fail:      if (p->die_callback)          p->die_callback(p, p->die_callback_userdata); -    pa_mutex_unlock(p->mutex); -          pa_pstream_unref(p);  } @@ -211,13 +206,10 @@ static void io_callback(pa_iochannel*io, void *userdata) {      do_something(p);  } -static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) { +static void anotify_callback(uint8_t event, void *userdata) {      pa_pstream *p = userdata;      assert(p); -    assert(p->defer_event == e); -    assert(p->mainloop == m); -          do_something(p);  } @@ -237,16 +229,16 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *poo      p->dead = 0;      p->mutex = pa_mutex_new(1); +    p->anotify = pa_anotify_new(m, anotify_callback, p);      p->mainloop = m; -    p->defer_event = m->defer_new(m, defer_callback, p); -    m->defer_enable(p->defer_event, 0);      p->send_queue = pa_queue_new();      assert(p->send_queue);      p->write.current = NULL;      p->write.index = 0; +    pa_memchunk_reset(&p->write.memchunk);      p->read.memblock = NULL;      p->read.packet = NULL;      p->read.index = 0; @@ -309,9 +301,15 @@ static void pstream_free(pa_pstream *p) {      if (p->read.packet)          pa_packet_unref(p->read.packet); +    if (p->write.memchunk.memblock) +        pa_memblock_unref(p->write.memchunk.memblock); +      if (p->mutex)          pa_mutex_free(p->mutex); +    if (p->anotify) +        pa_anotify_free(p->anotify); +      pa_xfree(p);  } @@ -322,11 +320,6 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *cre      assert(PA_REFCNT_VALUE(p) > 0);      assert(packet); -    pa_mutex_lock(p->mutex); -     -    if (p->dead) -        goto finish; -          i = pa_xnew(struct item_info, 1);      i->type = PA_PSTREAM_ITEM_PACKET;      i->packet = pa_packet_ref(packet); @@ -336,12 +329,11 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *cre          i->creds = *creds;  #endif +    pa_mutex_lock(p->mutex);      pa_queue_push(p->send_queue, i); -    p->mainloop->defer_enable(p->defer_event, 1); - -finish: -      pa_mutex_unlock(p->mutex); +     +    pa_anotify_signal(p->anotify, 0);  }  void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) { @@ -352,12 +344,6 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa      assert(channel != (uint32_t) -1);      assert(chunk); -    pa_mutex_lock(p->mutex); -     -    if (p->dead) -        goto finish; - -    length = chunk->length;      idx = 0;      while (length > 0) { @@ -379,17 +365,15 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa          i->with_creds = 0;  #endif +        pa_mutex_lock(p->mutex);          pa_queue_push(p->send_queue, i); +        pa_mutex_unlock(p->mutex);          idx += n;          length -= n;      } -         -    p->mainloop->defer_enable(p->defer_event, 1); -finish: -     -    pa_mutex_unlock(p->mutex); +    pa_anotify_signal(p->anotify, 0);  }  static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) { @@ -399,11 +383,6 @@ static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userd      assert(p);      assert(PA_REFCNT_VALUE(p) > 0); -    pa_mutex_lock(p->mutex); -     -    if (p->dead) -        goto finish; -  /*     pa_log("Releasing block %u", block_id); */      item = pa_xnew(struct item_info, 1); @@ -413,12 +392,11 @@ static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userd      item->with_creds = 0;  #endif +    pa_mutex_lock(p->mutex);      pa_queue_push(p->send_queue, item); -    p->mainloop->defer_enable(p->defer_event, 1); - -finish: -      pa_mutex_unlock(p->mutex); + +    pa_anotify_signal(p->anotify, 0);  }  static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) { @@ -428,11 +406,6 @@ static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userda      assert(p);      assert(PA_REFCNT_VALUE(p) > 0); -    pa_mutex_lock(p->mutex); -     -    if (p->dead) -        goto finish; -  /*     pa_log("Revoking block %u", block_id); */      item = pa_xnew(struct item_info, 1); @@ -442,23 +415,27 @@ static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userda      item->with_creds = 0;  #endif +    pa_mutex_lock(p->mutex);      pa_queue_push(p->send_queue, item); -    p->mainloop->defer_enable(p->defer_event, 1); - -finish: -      pa_mutex_unlock(p->mutex); + +    pa_anotify_signal(p->anotify, 0);  }  static void prepare_next_write_item(pa_pstream *p) {      assert(p);      assert(PA_REFCNT_VALUE(p) > 0); -    if (!(p->write.current = pa_queue_pop(p->send_queue))) +    pa_mutex_lock(p->mutex); +    p->write.current = pa_queue_pop(p->send_queue); +    pa_mutex_unlock(p->mutex); + +    if (!p->write.current)          return;      p->write.index = 0;      p->write.data = NULL; +    pa_memchunk_reset(&p->write.memchunk);      p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;      p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1); @@ -525,7 +502,9 @@ static void prepare_next_write_item(pa_pstream *p) {          if (send_payload) {              p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length); -            p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index; +            p->write.memchunk = p->write.current->chunk; +            pa_memblock_ref(p->write.memchunk.memblock); +            p->write.data = NULL;          }          p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags); @@ -541,6 +520,7 @@ static int do_write(pa_pstream *p) {      void *d;      size_t l;      ssize_t r; +    pa_memblock *release_memblock = NULL;      assert(p);      assert(PA_REFCNT_VALUE(p) > 0); @@ -555,9 +535,16 @@ static int do_write(pa_pstream *p) {          d = (uint8_t*) p->write.descriptor + p->write.index;          l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;      } else { -        assert(p->write.data); +        assert(p->write.data || p->write.memchunk.memblock); + +        if (p->write.data) +            d = p->write.data; +        else { +            d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index; +            release_memblock = p->write.memchunk.memblock; +        } -        d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE; +        d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;          l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);      } @@ -567,14 +554,17 @@ static int do_write(pa_pstream *p) {      if (p->send_creds_now) {          if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0) -            return -1; +            goto fail;          p->send_creds_now = 0;      } else  #endif      if ((r = pa_iochannel_write(p->io, d, l)) < 0) -        return -1; +        goto fail; + +    if (release_memblock) +        pa_memblock_release(release_memblock);      p->write.index += r; @@ -588,12 +578,20 @@ static int do_write(pa_pstream *p) {      }      return 0; + +fail: + +    if (release_memblock) +        pa_memblock_release(release_memblock); +     +    return -1;  }  static int do_read(pa_pstream *p) {      void *d;      size_t l;       ssize_t r; +    pa_memblock *release_memblock = NULL;      assert(p);      assert(PA_REFCNT_VALUE(p) > 0); @@ -602,8 +600,16 @@ static int do_read(pa_pstream *p) {          d = (uint8_t*) p->read.descriptor + p->read.index;          l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;      } else { -        assert(p->read.data); -        d = (uint8_t*) p->read.data + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE; +        assert(p->read.data || p->read.memblock); + +        if (p->read.data) +            d = p->read.data; +        else { +            d = pa_memblock_acquire(p->read.memblock); +            release_memblock = p->read.memblock; +        } +         +        d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;          l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);      } @@ -612,14 +618,17 @@ static int do_read(pa_pstream *p) {          int b = 0;          if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0) -            return -1; +            goto fail;          p->read_creds_valid = p->read_creds_valid || b;      }  #else      if ((r = pa_iochannel_read(p->io, d, l)) <= 0) -        return -1; +        goto fail;  #endif + +    if (release_memblock) +        pa_memblock_release(release_memblock);      p->read.index += r; @@ -701,7 +710,7 @@ static int do_read(pa_pstream *p) {                  /* Frame is a memblock frame */                  p->read.memblock = pa_memblock_new(p->mempool, length); -                p->read.data = p->read.memblock->data; +                p->read.data = NULL;              } else {                  pa_log_warn("Recieved memblock frame with invalid flags value."); @@ -788,7 +797,7 @@ static int do_read(pa_pstream *p) {                      chunk.memblock = b;                      chunk.index = 0; -                    chunk.length = b->length; +                    chunk.length = pa_memblock_get_length(b);                      offset = (int64_t) (                              (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) | @@ -816,52 +825,51 @@ frame_done:      p->read.memblock = NULL;      p->read.packet = NULL;      p->read.index = 0; +    p->read.data = NULL;  #ifdef HAVE_CREDS      p->read_creds_valid = 0;  #endif      return 0; + +fail: +    if (release_memblock) +        pa_memblock_release(release_memblock); + +    return -1;  }  void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {      assert(p);      assert(PA_REFCNT_VALUE(p) > 0); -    pa_mutex_lock(p->mutex);      p->die_callback = cb;      p->die_callback_userdata = userdata; -    pa_mutex_unlock(p->mutex);  }  void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {      assert(p);      assert(PA_REFCNT_VALUE(p) > 0); -    pa_mutex_lock(p->mutex);      p->drain_callback = cb;      p->drain_callback_userdata = userdata; -    pa_mutex_unlock(p->mutex);  }  void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {      assert(p);      assert(PA_REFCNT_VALUE(p) > 0); -    pa_mutex_lock(p->mutex);      p->recieve_packet_callback = cb;      p->recieve_packet_callback_userdata = userdata; -    pa_mutex_unlock(p->mutex);  }  void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {      assert(p);      assert(PA_REFCNT_VALUE(p) > 0); -    pa_mutex_lock(p->mutex);      p->recieve_memblock_callback = cb;      p->recieve_memblock_callback_userdata = userdata; -    pa_mutex_unlock(p->mutex);  }  int pa_pstream_is_pending(pa_pstream *p) { @@ -901,8 +909,6 @@ pa_pstream* pa_pstream_ref(pa_pstream*p) {  void pa_pstream_close(pa_pstream *p) {      assert(p); -    pa_mutex_lock(p->mutex); -          p->dead = 1;      if (p->import) { @@ -920,25 +926,16 @@ void pa_pstream_close(pa_pstream *p) {          p->io = NULL;      } -    if (p->defer_event) { -        p->mainloop->defer_free(p->defer_event); -        p->defer_event = NULL; -    } -      p->die_callback = NULL;      p->drain_callback = NULL;      p->recieve_packet_callback = NULL;      p->recieve_memblock_callback = NULL; - -    pa_mutex_unlock(p->mutex);  }  void pa_pstream_use_shm(pa_pstream *p, int enable) {      assert(p);      assert(PA_REFCNT_VALUE(p) > 0); -    pa_mutex_lock(p->mutex); -      p->use_shm = enable;      if (enable) { @@ -953,6 +950,4 @@ void pa_pstream_use_shm(pa_pstream *p, int enable) {              p->export = NULL;          }      } - -    pa_mutex_unlock(p->mutex);  } diff --git a/src/pulsecore/resampler.c b/src/pulsecore/resampler.c index b0142049..c28c2fb3 100644 --- a/src/pulsecore/resampler.c +++ b/src/pulsecore/resampler.c @@ -51,8 +51,7 @@ struct pa_resampler {  };  struct impl_libsamplerate { -    pa_memblock *buf1_block, *buf2_block, *buf3_block, *buf4_block; -    float* buf1, *buf2, *buf3, *buf4; +    pa_memchunk buf1, buf2, buf3, buf4;      unsigned buf1_samples, buf2_samples, buf3_samples, buf4_samples;      pa_convert_to_float32ne_func_t to_float32ne_func; @@ -224,14 +223,14 @@ static void libsamplerate_free(pa_resampler *r) {      if (u->src_state)          src_delete(u->src_state); -    if (u->buf1_block) -        pa_memblock_unref(u->buf1_block); -    if (u->buf2_block) -        pa_memblock_unref(u->buf2_block); -    if (u->buf3_block) -        pa_memblock_unref(u->buf3_block); -    if (u->buf4_block) -        pa_memblock_unref(u->buf4_block); +    if (u->buf1.memblock) +        pa_memblock_unref(u->buf1.memblock); +    if (u->buf2.memblock) +        pa_memblock_unref(u->buf2.memblock); +    if (u->buf3.memblock) +        pa_memblock_unref(u->buf3.memblock); +    if (u->buf4.memblock) +        pa_memblock_unref(u->buf4.memblock);      pa_xfree(u);  } @@ -270,64 +269,80 @@ static void calc_map_table(pa_resampler *r) {      }  } -static float * convert_to_float(pa_resampler *r, void *input, unsigned n_frames) { +static pa_memchunk* convert_to_float(pa_resampler *r, pa_memchunk *input) {      struct impl_libsamplerate *u;      unsigned n_samples; +    void *src, *dst;      assert(r);      assert(input); +    assert(input->memblock); +          assert(r->impl_data);      u = r->impl_data;      /* Convert the incoming sample into floats and place them in buf1 */ -    if (!u->to_float32ne_func) +    if (!u->to_float32ne_func || !input->length)          return input; -    n_samples = n_frames * r->i_ss.channels; +    n_samples = (input->length / r->i_fz) * r->i_ss.channels; -    if (u->buf1_samples < n_samples) { -        if (u->buf1_block) -            pa_memblock_unref(u->buf1_block); +    if (!u->buf1.memblock || u->buf1_samples < n_samples) { +        if (u->buf1.memblock) +            pa_memblock_unref(u->buf1.memblock);          u->buf1_samples = n_samples; -        u->buf1_block = pa_memblock_new(r->mempool, sizeof(float) * n_samples); -        u->buf1 = u->buf1_block->data; +        u->buf1.memblock = pa_memblock_new(r->mempool, u->buf1.length = sizeof(float) * n_samples); +        u->buf1.index = 0;      } -     -    u->to_float32ne_func(n_samples, input, u->buf1); -    return u->buf1; +    src = (uint8_t*) pa_memblock_acquire(input->memblock) + input->index; +    dst = (uint8_t*) pa_memblock_acquire(u->buf1.memblock); +    u->to_float32ne_func(n_samples, src, dst); +    pa_memblock_release(input->memblock); +    pa_memblock_release(u->buf1.memblock); + +    u->buf1.length = sizeof(float) * n_samples; + +    return &u->buf1;  } -static float *remap_channels(pa_resampler *r, float *input, unsigned n_frames) { +static pa_memchunk *remap_channels(pa_resampler *r, pa_memchunk *input) {      struct impl_libsamplerate *u; -    unsigned n_samples; +    unsigned n_samples, n_frames;      int i_skip, o_skip;      unsigned oc; +    float *src, *dst;      assert(r);      assert(input); +    assert(input->memblock); +          assert(r->impl_data);      u = r->impl_data;      /* Remap channels and place the result int buf2 */ -    if (!u->map_required) +    if (!u->map_required || !input->length)          return input; -    n_samples = n_frames * r->o_ss.channels; +    n_samples = input->length / sizeof(float); +    n_frames = n_samples / r->o_ss.channels; -    if (u->buf2_samples < n_samples) { -        if (u->buf2_block) -            pa_memblock_unref(u->buf2_block); +    if (!u->buf2.memblock || u->buf2_samples < n_samples) { +        if (u->buf2.memblock) +            pa_memblock_unref(u->buf2.memblock);          u->buf2_samples = n_samples; -        u->buf2_block = pa_memblock_new(r->mempool, sizeof(float) * n_samples); -        u->buf2 = u->buf2_block->data; +        u->buf2.memblock = pa_memblock_new(r->mempool, u->buf2.length = sizeof(float) * n_samples); +        u->buf2.index = 0;      } -    memset(u->buf2, 0, n_samples * sizeof(float)); +    src = (float*) ((uint8_t*) pa_memblock_acquire(input->memblock) + input->index); +    dst = (float*) pa_memblock_acquire(u->buf2.memblock); +     +    memset(dst, 0, n_samples * sizeof(float));      o_skip = sizeof(float) * r->o_ss.channels;      i_skip = sizeof(float) * r->i_ss.channels; @@ -338,49 +353,57 @@ static float *remap_channels(pa_resampler *r, float *input, unsigned n_frames) {          for (i = 0; i < PA_CHANNELS_MAX && u->map_table[oc][i] >= 0; i++)              oil_vectoradd_f32( -                u->buf2 + oc, o_skip, -                u->buf2 + oc, o_skip, -                input + u->map_table[oc][i], i_skip, +                dst + oc, o_skip, +                dst + oc, o_skip, +                src + u->map_table[oc][i], i_skip,                  n_frames,                  &one, &one);      } -    return u->buf2; +    pa_memblock_release(input->memblock); +    pa_memblock_release(u->buf2.memblock); + +    u->buf2.length = n_frames * sizeof(float) * r->o_ss.channels; + +    return &u->buf2;  } -static float *resample(pa_resampler *r, float *input, unsigned *n_frames) { +static pa_memchunk *resample(pa_resampler *r, pa_memchunk *input) {      struct impl_libsamplerate *u;      SRC_DATA data; +    unsigned in_n_frames, in_n_samples;      unsigned out_n_frames, out_n_samples;      int ret;      assert(r);      assert(input); -    assert(n_frames);      assert(r->impl_data);      u = r->impl_data;      /* Resample the data and place the result in buf3 */ -    if (!u->src_state) +    if (!u->src_state || !input->length)          return input; -    out_n_frames = (*n_frames*r->o_ss.rate/r->i_ss.rate)+1024; +    in_n_samples = input->length / sizeof(float); +    in_n_frames = in_n_samples * r->o_ss.channels; + +    out_n_frames = (in_n_frames*r->o_ss.rate/r->i_ss.rate)+1024;      out_n_samples = out_n_frames * r->o_ss.channels; -    if (u->buf3_samples < out_n_samples) { -        if (u->buf3_block) -            pa_memblock_unref(u->buf3_block); +    if (!u->buf3.memblock || u->buf3_samples < out_n_samples) { +        if (u->buf3.memblock) +            pa_memblock_unref(u->buf3.memblock);          u->buf3_samples = out_n_samples; -        u->buf3_block = pa_memblock_new(r->mempool, sizeof(float) * out_n_samples); -        u->buf3 = u->buf3_block->data; +        u->buf3.memblock = pa_memblock_new(r->mempool, u->buf3.length = sizeof(float) * out_n_samples); +        u->buf3.index = 0;      } -    data.data_in = input; -    data.input_frames = *n_frames; +    data.data_in = (float*) ((uint8_t*) pa_memblock_acquire(input->memblock) + input->index); +    data.input_frames = in_n_frames; -    data.data_out = u->buf3; +    data.data_out = (float*) pa_memblock_acquire(u->buf3.memblock);      data.output_frames = out_n_frames;      data.src_ratio = (double) r->o_ss.rate / r->i_ss.rate; @@ -388,16 +411,20 @@ static float *resample(pa_resampler *r, float *input, unsigned *n_frames) {      ret = src_process(u->src_state, &data);      assert(ret == 0); -    assert((unsigned) data.input_frames_used == *n_frames); +    assert((unsigned) data.input_frames_used == in_n_frames); -    *n_frames = data.output_frames_gen; +    pa_memblock_release(input->memblock); +    pa_memblock_release(u->buf3.memblock); +     +    u->buf3.length = data.output_frames_gen * sizeof(float) * r->o_ss.channels; -    return u->buf3; +    return &u->buf3;  } -static void *convert_from_float(pa_resampler *r, float *input, unsigned n_frames) { +static pa_memchunk *convert_from_float(pa_resampler *r, pa_memchunk *input) {      struct impl_libsamplerate *u; -    unsigned n_samples; +    unsigned n_samples, n_frames; +    void *src, *dst;      assert(r);      assert(input); @@ -406,30 +433,35 @@ static void *convert_from_float(pa_resampler *r, float *input, unsigned n_frames      /* Convert the data into the correct sample type and place the result in buf4 */ -    if (!u->from_float32ne_func) +    if (!u->from_float32ne_func || !input->length)          return input; -     + +    n_frames = input->length / sizeof(float) / r->o_ss.channels;      n_samples = n_frames * r->o_ss.channels;      if (u->buf4_samples < n_samples) { -        if (u->buf4_block) -            pa_memblock_unref(u->buf4_block); +        if (u->buf4.memblock) +            pa_memblock_unref(u->buf4.memblock);          u->buf4_samples = n_samples; -        u->buf4_block = pa_memblock_new(r->mempool, sizeof(float) * n_samples); -        u->buf4 = u->buf4_block->data; +        u->buf4.memblock = pa_memblock_new(r->mempool, u->buf4.length = r->o_fz * n_frames); +        u->buf4.index = 0;      } -         -    u->from_float32ne_func(n_samples, input, u->buf4); -    return u->buf4; +    src = (uint8_t*) pa_memblock_acquire(input->memblock) + input->length; +    dst = pa_memblock_acquire(u->buf4.memblock); +    u->from_float32ne_func(n_samples, src, dst); +    pa_memblock_release(input->memblock); +    pa_memblock_release(u->buf4.memblock); + +    u->buf4.length = r->o_fz * n_frames; +     +    return &u->buf4;  }  static void libsamplerate_run(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out) {      struct impl_libsamplerate *u; -    float *buf; -    void *input, *output; -    unsigned n_frames; +    pa_memchunk *buf;      assert(r);      assert(in); @@ -441,55 +473,23 @@ static void libsamplerate_run(pa_resampler *r, const pa_memchunk *in, pa_memchun      u = r->impl_data; -    input = ((uint8_t*) in->memblock->data + in->index); -    n_frames = in->length / r->i_fz; -    assert(n_frames > 0); -     -    buf = convert_to_float(r, input, n_frames); -    buf = remap_channels(r, buf, n_frames); -    buf = resample(r, buf, &n_frames); - -    if (n_frames) { -        output = convert_from_float(r, buf, n_frames); - -        if (output == input) { -            /* Mm, no adjustment has been necessary, so let's return the original block */ -            out->memblock = pa_memblock_ref(in->memblock); -            out->index = in->index; -            out->length = in->length; -        } else { -            out->length = n_frames * r->o_fz; -            out->index = 0; -            out->memblock = NULL; -             -            if (output == u->buf1) { -                u->buf1 = NULL; -                u->buf1_samples = 0; -                out->memblock = u->buf1_block; -                u->buf1_block = NULL; -            } else if (output == u->buf2) { -                u->buf2 = NULL; -                u->buf2_samples = 0; -                out->memblock = u->buf2_block; -                u->buf2_block = NULL; -            } else if (output == u->buf3) { -                u->buf3 = NULL; -                u->buf3_samples = 0; -                out->memblock = u->buf3_block; -                u->buf3_block = NULL; -            } else if (output == u->buf4) { -                u->buf4 = NULL; -                u->buf4_samples = 0; -                out->memblock = u->buf4_block; -                u->buf4_block = NULL; -            } - -            assert(out->memblock); -        } -    } else { -        out->memblock = NULL; -        out->index = out->length = 0; -    } +    buf = convert_to_float(r, (pa_memchunk*) in); +    buf = remap_channels(r, buf); +    buf = resample(r, buf); + +    if (buf->length) { +        buf = convert_from_float(r, buf); +        *out = *buf; + +        if (buf == in) +            pa_memblock_ref(buf->memblock); +        else +            pa_memchunk_reset(buf); +    } else +        pa_memchunk_reset(out); + +    pa_memblock_release(in->memblock); +      }  static void libsamplerate_update_input_rate(pa_resampler *r, uint32_t rate) { @@ -516,8 +516,10 @@ static int libsamplerate_init(pa_resampler *r) {      r->impl_data = u = pa_xnew(struct impl_libsamplerate, 1); -    u->buf1 = u->buf2 = u->buf3 = u->buf4 = NULL; -    u->buf1_block = u->buf2_block = u->buf3_block = u->buf4_block = NULL; +    pa_memchunk_reset(&u->buf1); +    pa_memchunk_reset(&u->buf2); +    pa_memchunk_reset(&u->buf3); +    pa_memchunk_reset(&u->buf4);      u->buf1_samples = u->buf2_samples = u->buf3_samples = u->buf4_samples = 0;      if (r->i_ss.format == PA_SAMPLE_FLOAT32NE) @@ -578,12 +580,16 @@ static void trivial_run(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out          /* Do real resampling */          size_t l;          unsigned o_index; +        void *src, *dst;          /* The length of the new memory block rounded up */          l = ((((n_frames+1) * r->o_ss.rate) / r->i_ss.rate) + 1) * fz;          out->index = 0;          out->memblock = pa_memblock_new(r->mempool, l); + +        src = (uint8_t*) pa_memblock_acquire(in->memblock) + in->index; +        dst = pa_memblock_acquire(out->memblock);          for (o_index = 0;; o_index++, u->o_counter++) {              unsigned j; @@ -594,13 +600,16 @@ static void trivial_run(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out              if (j >= n_frames)                  break; -            assert(o_index*fz < out->memblock->length); +            assert(o_index*fz < pa_memblock_get_length(out->memblock)); -            memcpy((uint8_t*) out->memblock->data + fz*o_index, -                   (uint8_t*) in->memblock->data + in->index + fz*j, fz); +            memcpy((uint8_t*) dst + fz*o_index, +                   (uint8_t*) src + fz*j, fz);          } -             + +        pa_memblock_release(in->memblock); +        pa_memblock_release(out->memblock); +                  out->length = o_index*fz;      } diff --git a/src/pulsecore/sample-util.c b/src/pulsecore/sample-util.c index d902b4b5..52023d31 100644 --- a/src/pulsecore/sample-util.c +++ b/src/pulsecore/sample-util.c @@ -46,15 +46,27 @@ pa_memblock *pa_silence_memblock_new(pa_mempool *pool, const pa_sample_spec *spe  }  pa_memblock *pa_silence_memblock(pa_memblock* b, const pa_sample_spec *spec) { -    assert(b && b->data && spec); -    pa_silence_memory(b->data, b->length, spec); +    void *data; + +    assert(b); +    assert(spec); +     +    data = pa_memblock_acquire(b); +    pa_silence_memory(data, pa_memblock_get_length(b), spec); +    pa_memblock_release(b);      return b;  }  void pa_silence_memchunk(pa_memchunk *c, const pa_sample_spec *spec) { -    assert(c && c->memblock && c->memblock->data && spec && c->length); +    void *data; +     +    assert(c); +    assert(c->memblock); +    assert(spec); -    pa_silence_memory((uint8_t*) c->memblock->data+c->index, c->length, spec); +    data = pa_memblock_acquire(c->memblock); +    pa_silence_memory((uint8_t*) data+c->index, c->length, spec); +    pa_memblock_release(c->memblock);  }  void pa_silence_memory(void *p, size_t length, const pa_sample_spec *spec) { @@ -82,26 +94,38 @@ void pa_silence_memory(void *p, size_t length, const pa_sample_spec *spec) {  }  size_t pa_mix( -    const pa_mix_info streams[], -    unsigned nstreams, -    void *data, -    size_t length, -    const pa_sample_spec *spec, -    const pa_cvolume *volume, -    int mute) { +        pa_mix_info streams[], +        unsigned nstreams, +        void *data, +        size_t length, +        const pa_sample_spec *spec, +        const pa_cvolume *volume, +        int mute) { + +    pa_cvolume full_volume; +    size_t d = 0; +    unsigned k; -    assert(streams && data && length && spec); +    assert(streams); +    assert(data); +    assert(length); +    assert(spec); +    if (!volume) +        volume = pa_cvolume_reset(&full_volume, spec->channels); + +    for (k = 0; k < nstreams; k++) +        streams[k].internal = pa_memblock_acquire(streams[k].chunk.memblock); +          switch (spec->format) {          case PA_SAMPLE_S16NE:{ -            size_t d;              unsigned channel = 0;              for (d = 0;; d += sizeof(int16_t)) {                  int32_t sum = 0;                  if (d >= length) -                    return d; +                    goto finish;                  if (!mute && volume->values[channel] != PA_VOLUME_MUTED) {                      unsigned i; @@ -111,12 +135,12 @@ size_t pa_mix(                          pa_volume_t cvolume = streams[i].volume.values[channel];                          if (d >= streams[i].chunk.length) -                            return d; +                            goto finish;                          if (cvolume == PA_VOLUME_MUTED)                              v = 0;                          else { -                            v = *((int16_t*) ((uint8_t*) streams[i].chunk.memblock->data + streams[i].chunk.index + d)); +                            v = *((int16_t*) ((uint8_t*) streams[i].internal + streams[i].chunk.index + d));                              if (cvolume != PA_VOLUME_NORM)                                  v = (int32_t) (v * pa_sw_volume_to_linear(cvolume)); @@ -139,17 +163,18 @@ size_t pa_mix(                  if (++channel >= spec->channels)                      channel = 0;              } +             +            break;          }          case PA_SAMPLE_S16RE:{ -            size_t d;              unsigned channel = 0;              for (d = 0;; d += sizeof(int16_t)) {                  int32_t sum = 0;                  if (d >= length) -                    return d; +                    goto finish;                  if (!mute && volume->values[channel] != PA_VOLUME_MUTED) {                      unsigned i; @@ -159,12 +184,12 @@ size_t pa_mix(                          pa_volume_t cvolume = streams[i].volume.values[channel];                          if (d >= streams[i].chunk.length) -                            return d; +                            goto finish;                          if (cvolume == PA_VOLUME_MUTED)                              v = 0;                          else { -                            v = INT16_SWAP(*((int16_t*) ((uint8_t*) streams[i].chunk.memblock->data + streams[i].chunk.index + d))); +                            v = INT16_SWAP(*((int16_t*) ((uint8_t*) streams[i].internal + streams[i].chunk.index + d)));                              if (cvolume != PA_VOLUME_NORM)                                  v = (int32_t) (v * pa_sw_volume_to_linear(cvolume)); @@ -187,17 +212,18 @@ size_t pa_mix(                  if (++channel >= spec->channels)                      channel = 0;              } + +            break;          }          case PA_SAMPLE_U8: { -            size_t d;              unsigned channel = 0;              for (d = 0;; d ++) {                  int32_t sum = 0;                  if (d >= length) -                    return d; +                    goto finish;                  if (!mute && volume->values[channel] != PA_VOLUME_MUTED) {                      unsigned i; @@ -207,12 +233,12 @@ size_t pa_mix(                          pa_volume_t cvolume = streams[i].volume.values[channel];                          if (d >= streams[i].chunk.length) -                            return d; +                            goto finish;                          if (cvolume == PA_VOLUME_MUTED)                              v = 0;                          else { -                            v = (int32_t) *((uint8_t*) streams[i].chunk.memblock->data + streams[i].chunk.index + d) - 0x80; +                            v = (int32_t) *((uint8_t*) streams[i].internal + streams[i].chunk.index + d) - 0x80;                              if (cvolume != PA_VOLUME_NORM)                                  v = (int32_t) (v * pa_sw_volume_to_linear(cvolume)); @@ -235,17 +261,18 @@ size_t pa_mix(                  if (++channel >= spec->channels)                      channel = 0;              } +             +            break;          }          case PA_SAMPLE_FLOAT32NE: { -            size_t d;              unsigned channel = 0;              for (d = 0;; d += sizeof(float)) {                  float sum = 0;                  if (d >= length) -                    return d; +                    goto finish;                  if (!mute && volume->values[channel] != PA_VOLUME_MUTED) {                      unsigned i; @@ -255,12 +282,12 @@ size_t pa_mix(                          pa_volume_t cvolume = streams[i].volume.values[channel];                          if (d >= streams[i].chunk.length) -                            return d; +                            goto finish;                          if (cvolume == PA_VOLUME_MUTED)                              v = 0;                          else { -                            v = *((float*) ((uint8_t*) streams[i].chunk.memblock->data + streams[i].chunk.index + d)); +                            v = *((float*) ((uint8_t*) streams[i].internal + streams[i].chunk.index + d));                              if (cvolume != PA_VOLUME_NORM)                                  v *= pa_sw_volume_to_linear(cvolume); @@ -279,17 +306,34 @@ size_t pa_mix(                  if (++channel >= spec->channels)                      channel = 0;              } + +            break;          }          default:              pa_log_error("ERROR: Unable to mix audio data of format %s.", pa_sample_format_to_string(spec->format));              abort();      } + +finish: + +    for (k = 0; k < nstreams; k++) +        pa_memblock_release(streams[k].chunk.memblock); +     +    return d;  } -void pa_volume_memchunk(pa_memchunk*c, const pa_sample_spec *spec, const pa_cvolume *volume) { -    assert(c && spec && (c->length % pa_frame_size(spec) == 0)); +void pa_volume_memchunk( +        pa_memchunk*c, +        const pa_sample_spec *spec, +        const pa_cvolume *volume) { + +    void *ptr; +     +    assert(c); +    assert(spec); +    assert(c->length % pa_frame_size(spec) == 0);      assert(volume);      if (pa_cvolume_channels_equal_to(volume, PA_VOLUME_NORM)) @@ -300,6 +344,8 @@ void pa_volume_memchunk(pa_memchunk*c, const pa_sample_spec *spec, const pa_cvol          return;      } +    ptr = pa_memblock_acquire(c->memblock); +      switch (spec->format) {          case PA_SAMPLE_S16NE: {              int16_t *d; @@ -310,7 +356,7 @@ void pa_volume_memchunk(pa_memchunk*c, const pa_sample_spec *spec, const pa_cvol              for (channel = 0; channel < spec->channels; channel++)                  linear[channel] = pa_sw_volume_to_linear(volume->values[channel]); -            for (channel = 0, d = (int16_t*) ((uint8_t*) c->memblock->data+c->index), n = c->length/sizeof(int16_t); n > 0; d++, n--) { +            for (channel = 0, d = (int16_t*) ((uint8_t*) ptr + c->index), n = c->length/sizeof(int16_t); n > 0; d++, n--) {                  int32_t t = (int32_t)(*d);                  t = (int32_t) (t * linear[channel]); @@ -335,7 +381,7 @@ void pa_volume_memchunk(pa_memchunk*c, const pa_sample_spec *spec, const pa_cvol              for (channel = 0; channel < spec->channels; channel++)                  linear[channel] = pa_sw_volume_to_linear(volume->values[channel]); -            for (channel = 0, d = (int16_t*) ((uint8_t*) c->memblock->data+c->index), n = c->length/sizeof(int16_t); n > 0; d++, n--) { +            for (channel = 0, d = (int16_t*) ((uint8_t*) ptr + c->index), n = c->length/sizeof(int16_t); n > 0; d++, n--) {                  int32_t t = (int32_t)(INT16_SWAP(*d));                  t = (int32_t) (t * linear[channel]); @@ -357,7 +403,7 @@ void pa_volume_memchunk(pa_memchunk*c, const pa_sample_spec *spec, const pa_cvol              size_t n;              unsigned channel = 0; -            for (d = (uint8_t*) c->memblock->data + c->index, n = c->length; n > 0; d++, n--) { +            for (d = (uint8_t*) ptr + c->index, n = c->length; n > 0; d++, n--) {                  int32_t t = (int32_t) *d - 0x80;                  t = (int32_t) (t * pa_sw_volume_to_linear(volume->values[channel])); @@ -379,7 +425,7 @@ void pa_volume_memchunk(pa_memchunk*c, const pa_sample_spec *spec, const pa_cvol              unsigned n;              unsigned channel; -            d = (float*) ((uint8_t*) c->memblock->data + c->index); +            d = (float*) ((uint8_t*) ptr + c->index);              skip = spec->channels * sizeof(float);              n = c->length/sizeof(float)/spec->channels; @@ -402,5 +448,7 @@ void pa_volume_memchunk(pa_memchunk*c, const pa_sample_spec *spec, const pa_cvol                  pa_sample_format_to_string(spec->format));              abort();      } + +    pa_memblock_release(c->memblock);  } diff --git a/src/pulsecore/sample-util.h b/src/pulsecore/sample-util.h index 6b770792..04c2f6b1 100644 --- a/src/pulsecore/sample-util.h +++ b/src/pulsecore/sample-util.h @@ -36,10 +36,11 @@ typedef struct pa_mix_info {      pa_memchunk chunk;      pa_cvolume volume;      void *userdata; +    void *internal; /* Used internally by pa_mix(), should not be initialised when calling pa_mix() */  } pa_mix_info;  size_t pa_mix( -    const pa_mix_info channels[], +    pa_mix_info channels[],      unsigned nchannels,      void *data,      size_t length, diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c index d948f0a4..c3cd4952 100644 --- a/src/pulsecore/sink-input.c +++ b/src/pulsecore/sink-input.c @@ -294,6 +294,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)      assert(i->state == PA_SINK_INPUT_RUNNING || i->state == PA_SINK_INPUT_DRAINED);      if (i->move_silence > 0) { +        size_t l;          /* We have just been moved and shall play some silence for a           * while until the old sink has drained its playback buffer */ @@ -303,7 +304,8 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)          chunk->memblock = pa_memblock_ref(i->silence_memblock);          chunk->index = 0; -        chunk->length = i->move_silence < chunk->memblock->length ? i->move_silence : chunk->memblock->length; +        l = pa_memblock_get_length(chunk->memblock); +        chunk->length = i->move_silence < l ? i->move_silence : l;          ret = 0;          do_volume_adj_here = 1; @@ -389,10 +391,13 @@ void pa_sink_input_drop(pa_sink_input *i, const pa_memchunk *chunk, size_t lengt      if (i->move_silence > 0) {          if (chunk) { +            size_t l; +            l = pa_memblock_get_length(i->silence_memblock); +                          if (chunk->memblock != i->silence_memblock ||                  chunk->index != 0 || -                (chunk->memblock && (chunk->length != (i->silence_memblock->length < i->move_silence ? i->silence_memblock->length : i->move_silence))))  +                (chunk->memblock && (chunk->length != (l < i->move_silence ? l : i->move_silence))))                   return;          } diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c index 05695254..04795e39 100644 --- a/src/pulsecore/sink.c +++ b/src/pulsecore/sink.c @@ -237,7 +237,6 @@ static unsigned fill_mix_info(pa_sink *s, pa_mix_info *info, unsigned maxinfo) {          info->userdata = i;          assert(info->chunk.memblock); -        assert(info->chunk.memblock->data);          assert(info->chunk.length);          info++; @@ -305,13 +304,16 @@ int pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result) {                  pa_volume_memchunk(result, &s->sample_spec, &volume);          }      } else { +        void *ptr;          result->memblock = pa_memblock_new(s->core->mempool, length);          assert(result->memblock);  /*          pa_log("mixing %i", n);  */ -        result->length = pa_mix(info, n, result->memblock->data, length, -            &s->sample_spec, &s->sw_volume, s->sw_muted); +        ptr = pa_memblock_acquire(result->memblock); +        result->length = pa_mix(info, n, ptr, length, &s->sample_spec, &s->sw_volume, s->sw_muted); +        pa_memblock_release(result->memblock); +                  result->index = 0;      } @@ -332,13 +334,13 @@ int pa_sink_render_into(pa_sink*s, pa_memchunk *target) {      pa_mix_info info[MAX_MIX_CHANNELS];      unsigned n;      int r = -1; +    void *ptr;      assert(s);      assert(s->ref >= 1);      assert(target);      assert(target->memblock);      assert(target->length); -    assert(target->memblock->data);      pa_sink_ref(s); @@ -347,16 +349,23 @@ int pa_sink_render_into(pa_sink*s, pa_memchunk *target) {      if (n <= 0)          goto finish; +    ptr = pa_memblock_acquire(target->memblock); +          if (n == 1) { +        void *src;          pa_cvolume volume;          if (target->length > info[0].chunk.length)              target->length = info[0].chunk.length; + +        src = pa_memblock_acquire(info[0].chunk.memblock); -        memcpy((uint8_t*) target->memblock->data + target->index, -               (uint8_t*) info[0].chunk.memblock->data + info[0].chunk.index, +        memcpy((uint8_t*) ptr + target->index, +               (uint8_t*) src + info[0].chunk.index,                 target->length); +        pa_memblock_release(info[0].chunk.memblock); +          pa_sw_cvolume_multiply(&volume, &s->sw_volume, &info[0].volume);          if (s->sw_muted) @@ -365,11 +374,13 @@ int pa_sink_render_into(pa_sink*s, pa_memchunk *target) {              pa_volume_memchunk(target, &s->sample_spec, &volume);      } else          target->length = pa_mix(info, n, -                                (uint8_t*) target->memblock->data + target->index, +                                (uint8_t*) ptr + target->index,                                  target->length,                                  &s->sample_spec,                                  &s->sw_volume,                                  s->sw_muted); + +    pa_memblock_release(target->memblock);      inputs_drop(s, info, n, target->length); @@ -393,7 +404,6 @@ void pa_sink_render_into_full(pa_sink *s, pa_memchunk *target) {      assert(target);      assert(target->memblock);      assert(target->length); -    assert(target->memblock->data);      pa_sink_ref(s); diff --git a/src/pulsecore/sound-file-stream.c b/src/pulsecore/sound-file-stream.c index e6f24a79..d2ffeeed 100644 --- a/src/pulsecore/sound-file-stream.c +++ b/src/pulsecore/sound-file-stream.c @@ -74,21 +74,26 @@ static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) {      if (!u->memchunk.memblock) {          uint32_t fs = pa_frame_size(&i->sample_spec);          sf_count_t n; +        void *p;          u->memchunk.memblock = pa_memblock_new(i->sink->core->mempool, BUF_SIZE);          u->memchunk.index = 0; +        p = pa_memblock_acquire(u->memchunk.memblock); +                  if (u->readf_function) { -            if ((n = u->readf_function(u->sndfile, u->memchunk.memblock->data, BUF_SIZE/fs)) <= 0) +            if ((n = u->readf_function(u->sndfile, p, BUF_SIZE/fs)) <= 0)                  n = 0;              u->memchunk.length = n * fs;          } else { -            if ((n = sf_read_raw(u->sndfile, u->memchunk.memblock->data, BUF_SIZE)) <= 0) +            if ((n = sf_read_raw(u->sndfile, p, BUF_SIZE)) <= 0)                  n = 0;              u->memchunk.length = n;          } + +        pa_memblock_release(u->memchunk.memblock);          if (!u->memchunk.length) {              free_userdata(u); diff --git a/src/pulsecore/sound-file.c b/src/pulsecore/sound-file.c index 1bf650e2..c74a1586 100644 --- a/src/pulsecore/sound-file.c +++ b/src/pulsecore/sound-file.c @@ -40,7 +40,11 @@ int pa_sound_file_load(pa_mempool *pool, const char *fname, pa_sample_spec *ss,      int ret = -1;      size_t l;      sf_count_t (*readf_function)(SNDFILE *sndfile, void *ptr, sf_count_t frames) = NULL; -    assert(fname && ss && chunk); +    void *ptr = NULL; +     +    assert(fname); +    assert(ss); +    assert(chunk);      chunk->memblock = NULL;      chunk->index = chunk->length = 0; @@ -97,8 +101,10 @@ int pa_sound_file_load(pa_mempool *pool, const char *fname, pa_sample_spec *ss,      chunk->index = 0;      chunk->length = l; -    if ((readf_function && readf_function(sf, chunk->memblock->data, sfinfo.frames) != sfinfo.frames) || -        (!readf_function && sf_read_raw(sf, chunk->memblock->data, l) != l)) { +    ptr = pa_memblock_acquire(chunk->memblock); +     +    if ((readf_function && readf_function(sf, ptr, sfinfo.frames) != sfinfo.frames) || +        (!readf_function && sf_read_raw(sf, ptr, l) != l)) {          pa_log("Premature file end");          goto finish;      } @@ -110,6 +116,9 @@ finish:      if (sf)          sf_close(sf); +    if (ptr) +        pa_memblock_release(chunk->memblock); +          if (ret != 0 && chunk->memblock)          pa_memblock_unref(chunk->memblock); diff --git a/src/tests/flist-test.c b/src/tests/flist-test.c index abc0659d..06d68311 100644 --- a/src/tests/flist-test.c +++ b/src/tests/flist-test.c @@ -54,7 +54,7 @@ static void thread_func(void *data) {      int b = 1;      while (!quit) { -        char *text, *t; +        char *text;          /* Allocate some memory, if possible take it from the flist */          if (b && (text = pa_flist_pop(flist))) diff --git a/src/tests/mcalign-test.c b/src/tests/mcalign-test.c index 35691698..1584256c 100644 --- a/src/tests/mcalign-test.c +++ b/src/tests/mcalign-test.c @@ -59,24 +59,27 @@ int main(PA_GCC_UNUSED int argc, PA_GCC_UNUSED char *argv[]) {              c.index = c.length = 0;          } -        assert(c.index < c.memblock->length); +        assert(c.index < pa_memblock_get_length(c.memblock)); -        l = c.memblock->length - c.index; +        l = pa_memblock_get_length(c.memblock) - c.index;          l = l <= 1 ? l : rand() % (l-1) +1 ; -         -        if ((r = read(STDIN_FILENO, (uint8_t*) c.memblock->data + c.index, l)) <= 0) { + +        p = pa_memblock_acquire(c.memblock); +        if ((r = read(STDIN_FILENO, (uint8_t*) p + c.index, l)) <= 0) { +            pa_memblock_release(c.memblock);              fprintf(stderr, "read() failed: %s\n", r < 0 ? strerror(errno) : "EOF");              break;          } - +        pa_memblock_release(c.memblock); +                      c.length = r;          pa_mcalign_push(a, &c);          fprintf(stderr, "Read %ld bytes\n", (long)r);          c.index += r; -        if (c.index >= c.memblock->length) { +        if (c.index >= pa_memblock_get_length(c.memblock)) {              pa_memblock_unref(c.memblock);              pa_memchunk_reset(&c);          } @@ -87,7 +90,9 @@ int main(PA_GCC_UNUSED int argc, PA_GCC_UNUSED char *argv[]) {              if (pa_mcalign_pop(a, &t) < 0)                  break; -            pa_loop_write(STDOUT_FILENO, (uint8_t*) t.memblock->data + t.index, t.length, NULL); +            p = pa_memblock_acquire(t.memblock); +            pa_loop_write(STDOUT_FILENO, (uint8_t*) p + t.index, t.length, NULL); +            pa_memblock_release(t.memblock);              fprintf(stderr, "Wrote %lu bytes.\n", (unsigned long) t.length);              pa_memblock_unref(t.memblock); diff --git a/src/tests/memblock-test.c b/src/tests/memblock-test.c index ef2e0ad7..c2dd2efa 100644 --- a/src/tests/memblock-test.c +++ b/src/tests/memblock-test.c @@ -76,6 +76,7 @@ int main(int argc, char *argv[]) {      pa_memblock* blocks[5];      uint32_t id, shm_id;      size_t offset, size; +    char *x;      const char txt[] = "This is a test!"; @@ -90,10 +91,17 @@ int main(int argc, char *argv[]) {      assert(pool_a && pool_b && pool_c);      blocks[0] = pa_memblock_new_fixed(pool_a, (void*) txt, sizeof(txt), 1); +      blocks[1] = pa_memblock_new(pool_a, sizeof(txt)); -    snprintf(blocks[1]->data, blocks[1]->length, "%s", txt); +    x = pa_memblock_acquire(blocks[1]); +    snprintf(x, pa_memblock_get_length(blocks[1]), "%s", txt); +    pa_memblock_release(blocks[1]); +          blocks[2] = pa_memblock_new_pool(pool_a, sizeof(txt)); -    snprintf(blocks[2]->data, blocks[2]->length, "%s", txt); +    x = pa_memblock_acquire(blocks[2]); +    snprintf(x, pa_memblock_get_length(blocks[2]), "%s", txt); +    pa_memblock_release(blocks[1]); +      blocks[3] = pa_memblock_new_malloced(pool_a, pa_xstrdup(txt), sizeof(txt));      blocks[4] = NULL; @@ -130,14 +138,18 @@ int main(int argc, char *argv[]) {          mb_c = pa_memimport_get(import_c, id, shm_id, offset, size);          assert(mb_c); -        printf("1 data=%s\n", (char*) mb_c->data); +        x = pa_memblock_acquire(mb_c); +        printf("1 data=%s\n", x); +        pa_memblock_release(mb_c);          print_stats(pool_a, "A");          print_stats(pool_b, "B");          print_stats(pool_c, "C");          pa_memexport_free(export_b); -        printf("2 data=%s\n", (char*) mb_c->data); +        x = pa_memblock_acquire(mb_c); +        printf("2 data=%s\n", x); +        pa_memblock_release(mb_c);          pa_memblock_unref(mb_c);          pa_memimport_free(import_b); diff --git a/src/tests/memblockq-test.c b/src/tests/memblockq-test.c index 1ac4577b..02848eb2 100644 --- a/src/tests/memblockq-test.c +++ b/src/tests/memblockq-test.c @@ -131,8 +131,10 @@ int main(int argc, char *argv[]) {          if (pa_memblockq_peek(bq, &out) < 0)              break; -        for (e = (char*) out.memblock->data + out.index, n = 0; n < out.length; n++) +        p = pa_memblock_acquire(out.memblock); +        for (e = (char*) p + out.index, n = 0; n < out.length; n++)              printf("%c", *e); +        pa_memblock_release(out.memblock);          pa_memblock_unref(out.memblock);          pa_memblockq_drop(bq, &out, out.length);  | 
