summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorLennart Poettering <lennart@poettering.net>2006-08-18 19:55:18 +0000
committerLennart Poettering <lennart@poettering.net>2006-08-18 19:55:18 +0000
commit0e436a6926af56f37a74a03bb5e143e078ca0d55 (patch)
treef837bc433ba8b6f904e6d512f9c9c2552a9827db /src
parentff48681aaef919cd2c85e4572928e936397a615c (diff)
Rework memory management to allow shared memory data transfer. The central idea
is to allocate all audio memory blocks from a per-process memory pool which is available as read-only SHM segment to other local processes. Then, instead of writing the actual audio data to the socket just write references to this shared memory pool. To work optimally all memory blocks should now be of type PA_MEMBLOCK_POOL or PA_MEMBLOCK_POOL_EXTERNAL. The function pa_memblock_new() now generates memory blocks of this type by default. git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@1266 fefdeb5f-60dc-0310-8127-8f9354f1896f
Diffstat (limited to 'src')
-rw-r--r--src/daemon/main.c2
-rw-r--r--src/modules/module-alsa-sink.c2
-rw-r--r--src/modules/module-alsa-source.c2
-rw-r--r--src/modules/module-combine.c3
-rw-r--r--src/modules/module-jack-source.c2
-rw-r--r--src/modules/module-oss-mmap.c6
-rw-r--r--src/modules/module-oss.c4
-rw-r--r--src/modules/module-pipe-source.c2
-rw-r--r--src/modules/module-sine.c2
-rw-r--r--src/modules/module-tunnel.c2
-rw-r--r--src/modules/rtp/module-rtp-recv.c11
-rw-r--r--src/modules/rtp/module-rtp-send.c3
-rw-r--r--src/modules/rtp/rtp.c4
-rw-r--r--src/modules/rtp/rtp.h2
-rw-r--r--src/pulse/context.c8
-rw-r--r--src/pulse/internal.h2
-rw-r--r--src/pulse/stream.c7
-rw-r--r--src/pulsecore/cli-command.c40
-rw-r--r--src/pulsecore/core-scache.c4
-rw-r--r--src/pulsecore/core.c6
-rw-r--r--src/pulsecore/core.h4
-rw-r--r--src/pulsecore/mcalign.c6
-rw-r--r--src/pulsecore/mcalign.h2
-rw-r--r--src/pulsecore/memblock.c737
-rw-r--r--src/pulsecore/memblock.h108
-rw-r--r--src/pulsecore/memblockq.c7
-rw-r--r--src/pulsecore/memblockq.h3
-rw-r--r--src/pulsecore/memchunk.c9
-rw-r--r--src/pulsecore/memchunk.h2
-rw-r--r--src/pulsecore/protocol-esound.c10
-rw-r--r--src/pulsecore/protocol-native.c25
-rw-r--r--src/pulsecore/protocol-simple.c8
-rw-r--r--src/pulsecore/pstream.c396
-rw-r--r--src/pulsecore/pstream.h4
-rw-r--r--src/pulsecore/resampler.c21
-rw-r--r--src/pulsecore/resampler.h12
-rw-r--r--src/pulsecore/sample-util.c7
-rw-r--r--src/pulsecore/sample-util.h2
-rw-r--r--src/pulsecore/sink-input.c12
-rw-r--r--src/pulsecore/sink.c6
-rw-r--r--src/pulsecore/sound-file-stream.c2
-rw-r--r--src/pulsecore/sound-file.c4
-rw-r--r--src/pulsecore/sound-file.h2
-rw-r--r--src/pulsecore/source-output.c4
-rw-r--r--src/pulsecore/source.c2
45 files changed, 1216 insertions, 293 deletions
diff --git a/src/daemon/main.c b/src/daemon/main.c
index 38d465f8..aada0ad7 100644
--- a/src/daemon/main.c
+++ b/src/daemon/main.c
@@ -559,7 +559,7 @@ int main(int argc, char *argv[]) {
mainloop = pa_mainloop_new();
assert(mainloop);
- c = pa_core_new(pa_mainloop_get_api(mainloop));
+ c = pa_core_new(pa_mainloop_get_api(mainloop), 1);
assert(c);
c->is_system_instance = !!conf->system_instance;
diff --git a/src/modules/module-alsa-sink.c b/src/modules/module-alsa-sink.c
index 8da3d236..0cebd50f 100644
--- a/src/modules/module-alsa-sink.c
+++ b/src/modules/module-alsa-sink.c
@@ -492,7 +492,7 @@ int pa__init(pa_core *c, pa_module*m) {
pa_log_info(__FILE__": using %u fragments of size %lu bytes.", periods, (long unsigned)u->fragment_size);
- u->silence.memblock = pa_memblock_new(u->silence.length = u->fragment_size, c->memblock_stat);
+ u->silence.memblock = pa_memblock_new(c->mempool, u->silence.length = u->fragment_size);
assert(u->silence.memblock);
pa_silence_memblock(u->silence.memblock, &ss);
u->silence.index = 0;
diff --git a/src/modules/module-alsa-source.c b/src/modules/module-alsa-source.c
index 4a8678c9..c3979df1 100644
--- a/src/modules/module-alsa-source.c
+++ b/src/modules/module-alsa-source.c
@@ -151,7 +151,7 @@ static void do_read(struct userdata *u) {
size_t l;
if (!u->memchunk.memblock) {
- u->memchunk.memblock = pa_memblock_new(u->memchunk.length = u->fragment_size, u->source->core->memblock_stat);
+ u->memchunk.memblock = pa_memblock_new(u->source->core->mempool, u->memchunk.length = u->fragment_size);
u->memchunk.index = 0;
}
diff --git a/src/modules/module-combine.c b/src/modules/module-combine.c
index 008fe6e7..5243975b 100644
--- a/src/modules/module-combine.c
+++ b/src/modules/module-combine.c
@@ -235,8 +235,7 @@ static struct output *output_new(struct userdata *u, pa_sink *sink, int resample
pa_frame_size(&u->sink->sample_spec),
1,
0,
- NULL,
- sink->core->memblock_stat);
+ NULL);
snprintf(t, sizeof(t), "%s: output #%u", u->sink->name, u->n_outputs+1);
diff --git a/src/modules/module-jack-source.c b/src/modules/module-jack-source.c
index 583f3b8e..8e659198 100644
--- a/src/modules/module-jack-source.c
+++ b/src/modules/module-jack-source.c
@@ -137,7 +137,7 @@ static void io_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_
fs = pa_frame_size(&u->source->sample_spec);
- chunk.memblock = pa_memblock_new(chunk.length = u->frames_posted * fs, u->core->memblock_stat);
+ chunk.memblock = pa_memblock_new(u->core->mempool, chunk.length = u->frames_posted * fs);
chunk.index = 0;
for (frame_idx = 0; frame_idx < u->frames_posted; frame_idx ++) {
diff --git a/src/modules/module-oss-mmap.c b/src/modules/module-oss-mmap.c
index c783a2f1..75ab9a9e 100644
--- a/src/modules/module-oss-mmap.c
+++ b/src/modules/module-oss-mmap.c
@@ -162,10 +162,10 @@ static void out_fill_memblocks(struct userdata *u, unsigned n) {
chunk.memblock = u->out_memblocks[u->out_current] =
pa_memblock_new_fixed(
+ u->core->mempool,
(uint8_t*) u->out_mmap+u->out_fragment_size*u->out_current,
u->out_fragment_size,
- 1,
- u->core->memblock_stat);
+ 1);
assert(chunk.memblock);
chunk.length = chunk.memblock->length;
chunk.index = 0;
@@ -210,7 +210,7 @@ static void in_post_memblocks(struct userdata *u, unsigned n) {
pa_memchunk chunk;
if (!u->in_memblocks[u->in_current]) {
- chunk.memblock = u->in_memblocks[u->in_current] = pa_memblock_new_fixed((uint8_t*) u->in_mmap+u->in_fragment_size*u->in_current, u->in_fragment_size, 1, u->core->memblock_stat);
+ chunk.memblock = u->in_memblocks[u->in_current] = pa_memblock_new_fixed(u->core->mempool, (uint8_t*) u->in_mmap+u->in_fragment_size*u->in_current, u->in_fragment_size, 1);
chunk.length = chunk.memblock->length;
chunk.index = 0;
diff --git a/src/modules/module-oss.c b/src/modules/module-oss.c
index ce11ee02..b9b80e72 100644
--- a/src/modules/module-oss.c
+++ b/src/modules/module-oss.c
@@ -217,7 +217,7 @@ static void do_read(struct userdata *u) {
}
do {
- memchunk.memblock = pa_memblock_new(l, u->core->memblock_stat);
+ memchunk.memblock = pa_memblock_new(u->core->mempool, l);
assert(memchunk.memblock);
if ((r = pa_iochannel_read(u->io, memchunk.memblock->data, memchunk.memblock->length)) < 0) {
pa_memblock_unref(memchunk.memblock);
@@ -503,7 +503,7 @@ int pa__init(pa_core *c, pa_module*m) {
u->out_fragment_size = out_frag_size;
u->in_fragment_size = in_frag_size;
- u->silence.memblock = pa_memblock_new(u->silence.length = u->out_fragment_size, u->core->memblock_stat);
+ u->silence.memblock = pa_memblock_new(u->core->mempool, u->silence.length = u->out_fragment_size);
assert(u->silence.memblock);
pa_silence_memblock(u->silence.memblock, &ss);
u->silence.index = 0;
diff --git a/src/modules/module-pipe-source.c b/src/modules/module-pipe-source.c
index 5caa60a3..43a8dab5 100644
--- a/src/modules/module-pipe-source.c
+++ b/src/modules/module-pipe-source.c
@@ -91,7 +91,7 @@ static void do_read(struct userdata *u) {
pa_module_set_used(u->module, pa_idxset_size(u->source->outputs));
if (!u->chunk.memblock) {
- u->chunk.memblock = pa_memblock_new(1024, u->core->memblock_stat);
+ u->chunk.memblock = pa_memblock_new(u->core->mempool, PIPE_BUF);
u->chunk.index = chunk.length = 0;
}
diff --git a/src/modules/module-sine.c b/src/modules/module-sine.c
index 5ceddce0..89c3c609 100644
--- a/src/modules/module-sine.c
+++ b/src/modules/module-sine.c
@@ -139,7 +139,7 @@ int pa__init(pa_core *c, pa_module*m) {
goto fail;
}
- u->memblock = pa_memblock_new(pa_bytes_per_second(&ss), c->memblock_stat);
+ u->memblock = pa_memblock_new(c->mempool, pa_bytes_per_second(&ss));
calc_sine(u->memblock->data, u->memblock->length, frequency);
snprintf(t, sizeof(t), "Sine Generator at %u Hz", frequency);
diff --git a/src/modules/module-tunnel.c b/src/modules/module-tunnel.c
index 9bb11c09..53bffd3b 100644
--- a/src/modules/module-tunnel.c
+++ b/src/modules/module-tunnel.c
@@ -651,7 +651,7 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata
return;
}
- u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->memblock_stat);
+ u->pstream = pa_pstream_new(u->core->mainloop, io, u->core->mempool);
u->pdispatch = pa_pdispatch_new(u->core->mainloop, command_table, PA_COMMAND_MAX);
pa_pstream_set_die_callback(u->pstream, pstream_die_callback, u);
diff --git a/src/modules/rtp/module-rtp-recv.c b/src/modules/rtp/module-rtp-recv.c
index df6f8c11..5d3f3e27 100644
--- a/src/modules/rtp/module-rtp-recv.c
+++ b/src/modules/rtp/module-rtp-recv.c
@@ -150,7 +150,7 @@ static void rtp_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event
assert(fd == s->rtp_context.fd);
assert(flags == PA_IO_EVENT_INPUT);
- if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->core->memblock_stat) < 0)
+ if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->core->mempool) < 0)
return;
if (s->sdp_info.payload != s->rtp_context.payload) {
@@ -312,10 +312,10 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in
s->sink_input->kill = sink_input_kill;
s->sink_input->get_latency = sink_input_get_latency;
- silence = pa_silence_memblock_new(&s->sink_input->sample_spec,
+ silence = pa_silence_memblock_new(s->userdata->core->mempool,
+ &s->sink_input->sample_spec,
(pa_bytes_per_second(&s->sink_input->sample_spec)/128/pa_frame_size(&s->sink_input->sample_spec))*
- pa_frame_size(&s->sink_input->sample_spec),
- s->userdata->core->memblock_stat);
+ pa_frame_size(&s->sink_input->sample_spec));
s->memblockq = pa_memblockq_new(
0,
@@ -324,8 +324,7 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in
pa_frame_size(&s->sink_input->sample_spec),
pa_bytes_per_second(&s->sink_input->sample_spec)/10+1,
0,
- silence,
- u->core->memblock_stat);
+ silence);
pa_memblock_unref(silence);
diff --git a/src/modules/rtp/module-rtp-send.c b/src/modules/rtp/module-rtp-send.c
index 759aa819..1b85c840 100644
--- a/src/modules/rtp/module-rtp-send.c
+++ b/src/modules/rtp/module-rtp-send.c
@@ -297,8 +297,7 @@ int pa__init(pa_core *c, pa_module*m) {
pa_frame_size(&ss),
1,
0,
- NULL,
- c->memblock_stat);
+ NULL);
u->mtu = mtu;
diff --git a/src/modules/rtp/rtp.c b/src/modules/rtp/rtp.c
index ee037d42..8e77c60a 100644
--- a/src/modules/rtp/rtp.c
+++ b/src/modules/rtp/rtp.c
@@ -149,7 +149,7 @@ pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame
return c;
}
-int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_memblock_stat *st) {
+int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) {
int size;
struct msghdr m;
struct iovec iov;
@@ -170,7 +170,7 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_memblock_stat *st) {
if (!size)
return 0;
- chunk->memblock = pa_memblock_new(size, st);
+ chunk->memblock = pa_memblock_new(pool, size);
iov.iov_base = chunk->memblock->data;
iov.iov_len = size;
diff --git a/src/modules/rtp/rtp.h b/src/modules/rtp/rtp.h
index 35fbbd35..123602b2 100644
--- a/src/modules/rtp/rtp.h
+++ b/src/modules/rtp/rtp.h
@@ -41,7 +41,7 @@ pa_rtp_context* pa_rtp_context_init_send(pa_rtp_context *c, int fd, uint32_t ssr
int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q);
pa_rtp_context* pa_rtp_context_init_recv(pa_rtp_context *c, int fd, size_t frame_size);
-int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_memblock_stat *st);
+int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool);
void pa_rtp_context_destroy(pa_rtp_context *c);
diff --git a/src/pulse/context.c b/src/pulse/context.c
index 34f517f0..b3530542 100644
--- a/src/pulse/context.c
+++ b/src/pulse/context.c
@@ -128,7 +128,7 @@ pa_context *pa_context_new(pa_mainloop_api *mainloop, const char *name) {
c->subscribe_callback = NULL;
c->subscribe_userdata = NULL;
- c->memblock_stat = pa_memblock_stat_new();
+ c->mempool = pa_mempool_new(1);
c->local = -1;
c->server_list = NULL;
c->server = NULL;
@@ -177,7 +177,7 @@ static void context_free(pa_context *c) {
if (c->playback_streams)
pa_dynarray_free(c->playback_streams, NULL, NULL);
- pa_memblock_stat_unref(c->memblock_stat);
+ pa_mempool_free(c->mempool);
if (c->conf)
pa_client_conf_free(c->conf);
@@ -407,7 +407,9 @@ static void setup_context(pa_context *c, pa_iochannel *io) {
pa_context_ref(c);
assert(!c->pstream);
- c->pstream = pa_pstream_new(c->mainloop, io, c->memblock_stat);
+ c->pstream = pa_pstream_new(c->mainloop, io, c->mempool);
+
+ pa_pstream_use_shm(c->pstream, 1);
pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
diff --git a/src/pulse/internal.h b/src/pulse/internal.h
index 96028d83..afcfaeff 100644
--- a/src/pulse/internal.h
+++ b/src/pulse/internal.h
@@ -69,7 +69,7 @@ struct pa_context {
pa_context_subscribe_cb_t subscribe_callback;
void *subscribe_userdata;
- pa_memblock_stat *memblock_stat;
+ pa_mempool *mempool;
int local;
int do_autospawn;
diff --git a/src/pulse/stream.c b/src/pulse/stream.c
index 677df009..180cd096 100644
--- a/src/pulse/stream.c
+++ b/src/pulse/stream.c
@@ -437,8 +437,7 @@ void pa_create_stream_callback(pa_pdispatch *pd, uint32_t command, PA_GCC_UNUSED
pa_frame_size(&s->sample_spec),
1,
0,
- NULL,
- s->context->memblock_stat);
+ NULL);
}
s->channel_valid = 1;
@@ -604,9 +603,9 @@ int pa_stream_write(
return 0;
if (free_cb)
- chunk.memblock = pa_memblock_new_user((void*) data, length, free_cb, 1, s->context->memblock_stat);
+ chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) data, length, free_cb, 1);
else {
- chunk.memblock = pa_memblock_new(length, s->context->memblock_stat);
+ chunk.memblock = pa_memblock_new(s->context->mempool, length);
memcpy(chunk.memblock->data, data, length);
}
diff --git a/src/pulsecore/cli-command.c b/src/pulsecore/cli-command.c
index f74258d3..811b96d2 100644
--- a/src/pulsecore/cli-command.c
+++ b/src/pulsecore/cli-command.c
@@ -100,6 +100,7 @@ static int pa_cli_command_dump(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int
static int pa_cli_command_list_props(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
static int pa_cli_command_move_sink_input(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
static int pa_cli_command_move_source_output(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
+static int pa_cli_command_vacuum(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail);
/* A method table for all available commands */
@@ -144,6 +145,7 @@ static const struct command commands[] = {
{ "list-props", pa_cli_command_list_props, NULL, 1},
{ "move-sink-input", pa_cli_command_move_sink_input, "Move sink input to another sink (args: index, sink)", 3},
{ "move-source-output", pa_cli_command_move_source_output, "Move source output to another source (args: index, source)", 3},
+ { "vacuum", pa_cli_command_vacuum, NULL, 1},
{ NULL, NULL, NULL, 0 }
};
@@ -239,23 +241,32 @@ static int pa_cli_command_source_outputs(pa_core *c, pa_tokenizer *t, pa_strbuf
static int pa_cli_command_stat(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, PA_GCC_UNUSED int *fail) {
char s[256];
+ const pa_mempool_stat *stat;
assert(c && t);
- pa_bytes_snprint(s, sizeof(s), c->memblock_stat->total_size);
+ stat = pa_mempool_get_stat(c->mempool);
+
pa_strbuf_printf(buf, "Memory blocks currently allocated: %u, size: %s.\n",
- c->memblock_stat->total,
- s);
+ stat->n_allocated,
+ pa_bytes_snprint(s, sizeof(s), stat->allocated_size));
- pa_bytes_snprint(s, sizeof(s), c->memblock_stat->allocated_size);
pa_strbuf_printf(buf, "Memory blocks allocated during the whole lifetime: %u, size: %s.\n",
- c->memblock_stat->allocated,
- s);
+ stat->n_accumulated,
+ pa_bytes_snprint(s, sizeof(s), stat->accumulated_size));
+
+ pa_strbuf_printf(buf, "Memory blocks imported from other processes: %u, size: %s.\n",
+ stat->n_imported,
+ pa_bytes_snprint(s, sizeof(s), stat->imported_size));
- pa_bytes_snprint(s, sizeof(s), pa_scache_total_size(c));
- pa_strbuf_printf(buf, "Total sample cache size: %s.\n", s);
+ pa_strbuf_printf(buf, "Memory blocks exported to other processes: %u, size: %s.\n",
+ stat->n_exported,
+ pa_bytes_snprint(s, sizeof(s), stat->exported_size));
- pa_sample_spec_snprint(s, sizeof(s), &c->default_sample_spec);
- pa_strbuf_printf(buf, "Default sample spec: %s\n", s);
+ pa_strbuf_printf(buf, "Total sample cache size: %s.\n",
+ pa_bytes_snprint(s, sizeof(s), pa_scache_total_size(c)));
+
+ pa_strbuf_printf(buf, "Default sample spec: %s\n",
+ pa_sample_spec_snprint(s, sizeof(s), &c->default_sample_spec));
pa_strbuf_printf(buf, "Default sink name: %s\n"
"Default source name: %s\n",
@@ -731,6 +742,15 @@ static int pa_cli_command_list_props(pa_core *c, pa_tokenizer *t, pa_strbuf *buf
return 0;
}
+static int pa_cli_command_vacuum(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail) {
+ assert(c);
+ assert(t);
+
+ pa_mempool_vacuum(c->mempool);
+
+ return 0;
+}
+
static int pa_cli_command_move_sink_input(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, int *fail) {
const char *n, *k;
pa_sink_input *si;
diff --git a/src/pulsecore/core-scache.c b/src/pulsecore/core-scache.c
index 377dd569..ca2408fe 100644
--- a/src/pulsecore/core-scache.c
+++ b/src/pulsecore/core-scache.c
@@ -176,7 +176,7 @@ int pa_scache_add_file(pa_core *c, const char *name, const char *filename, uint3
filename = buf;
#endif
- if (pa_sound_file_load(filename, &ss, &map, &chunk, c->memblock_stat) < 0)
+ if (pa_sound_file_load(c->mempool, filename, &ss, &map, &chunk) < 0)
return -1;
r = pa_scache_add_item(c, name, &ss, &map, &chunk, idx);
@@ -261,7 +261,7 @@ int pa_scache_play_item(pa_core *c, const char *name, pa_sink *sink, pa_volume_t
return -1;
if (e->lazy && !e->memchunk.memblock) {
- if (pa_sound_file_load(e->filename, &e->sample_spec, &e->channel_map, &e->memchunk, c->memblock_stat) < 0)
+ if (pa_sound_file_load(c->mempool, e->filename, &e->sample_spec, &e->channel_map, &e->memchunk) < 0)
return -1;
pa_subscription_post(c, PA_SUBSCRIPTION_EVENT_SAMPLE_CACHE|PA_SUBSCRIPTION_EVENT_CHANGE, e->index);
diff --git a/src/pulsecore/core.c b/src/pulsecore/core.c
index 7f2f0f60..5fdeab56 100644
--- a/src/pulsecore/core.c
+++ b/src/pulsecore/core.c
@@ -44,7 +44,7 @@
#include "core.h"
-pa_core* pa_core_new(pa_mainloop_api *m) {
+pa_core* pa_core_new(pa_mainloop_api *m, int shared) {
pa_core* c;
c = pa_xnew(pa_core, 1);
@@ -78,7 +78,7 @@ pa_core* pa_core_new(pa_mainloop_api *m) {
PA_LLIST_HEAD_INIT(pa_subscription_event, c->subscription_event_queue);
c->subscription_event_last = NULL;
- c->memblock_stat = pa_memblock_stat_new();
+ c->mempool = pa_mempool_new(shared);
c->disallow_module_loading = 0;
@@ -139,7 +139,7 @@ void pa_core_free(pa_core *c) {
pa_xfree(c->default_source_name);
pa_xfree(c->default_sink_name);
- pa_memblock_stat_unref(c->memblock_stat);
+ pa_mempool_free(c->mempool);
pa_property_cleanup(c);
diff --git a/src/pulsecore/core.h b/src/pulsecore/core.h
index f9fa386e..3a34d297 100644
--- a/src/pulsecore/core.h
+++ b/src/pulsecore/core.h
@@ -67,7 +67,7 @@ struct pa_core {
PA_LLIST_HEAD(pa_subscription_event, subscription_event_queue);
pa_subscription_event *subscription_event_last;
- pa_memblock_stat *memblock_stat;
+ pa_mempool *mempool;
int disallow_module_loading, running_as_daemon;
int exit_idle_time, module_idle_time, scache_idle_time;
@@ -88,7 +88,7 @@ struct pa_core {
hook_source_disconnect;
};
-pa_core* pa_core_new(pa_mainloop_api *m);
+pa_core* pa_core_new(pa_mainloop_api *m, int shared);
void pa_core_free(pa_core*c);
/* Check whether noone is connected to this core */
diff --git a/src/pulsecore/mcalign.c b/src/pulsecore/mcalign.c
index 8283a7a0..9ede610d 100644
--- a/src/pulsecore/mcalign.c
+++ b/src/pulsecore/mcalign.c
@@ -35,10 +35,9 @@
struct pa_mcalign {
size_t base;
pa_memchunk leftover, current;
- pa_memblock_stat *memblock_stat;
};
-pa_mcalign *pa_mcalign_new(size_t base, pa_memblock_stat *s) {
+pa_mcalign *pa_mcalign_new(size_t base) {
pa_mcalign *m;
assert(base);
@@ -47,7 +46,6 @@ pa_mcalign *pa_mcalign_new(size_t base, pa_memblock_stat *s) {
m->base = base;
pa_memchunk_reset(&m->leftover);
pa_memchunk_reset(&m->current);
- m->memblock_stat = s;
return m;
}
@@ -100,7 +98,7 @@ void pa_mcalign_push(pa_mcalign *m, const pa_memchunk *c) {
l = c->length;
/* Can we use the current block? */
- pa_memchunk_make_writable(&m->leftover, m->memblock_stat, m->base);
+ pa_memchunk_make_writable(&m->leftover, m->base);
memcpy((uint8_t*) m->leftover.memblock->data + m->leftover.index + m->leftover.length, (uint8_t*) c->memblock->data + c->index, l);
m->leftover.length += l;
diff --git a/src/pulsecore/mcalign.h b/src/pulsecore/mcalign.h
index 80e37499..94e99e21 100644
--- a/src/pulsecore/mcalign.h
+++ b/src/pulsecore/mcalign.h
@@ -63,7 +63,7 @@
typedef struct pa_mcalign pa_mcalign;
-pa_mcalign *pa_mcalign_new(size_t base, pa_memblock_stat *s);
+pa_mcalign *pa_mcalign_new(size_t base);
void pa_mcalign_free(pa_mcalign *m);
/* Push a new memchunk into the aligner. The caller of this routine
diff --git a/src/pulsecore/memblock.c b/src/pulsecore/memblock.c
index 36de17fb..4ce1b7c1 100644
--- a/src/pulsecore/memblock.c
+++ b/src/pulsecore/memblock.c
@@ -27,86 +27,271 @@
#include <stdlib.h>
#include <assert.h>
#include <string.h>
+#include <unistd.h>
#include <pulse/xmalloc.h>
+#include <pulsecore/shm.h>
+#include <pulsecore/log.h>
+#include <pulsecore/hashmap.h>
+
#include "memblock.h"
-static void stat_add(pa_memblock*m, pa_memblock_stat *s) {
- assert(m);
+#define PA_MEMPOOL_SLOTS_MAX 128
+#define PA_MEMPOOL_SLOT_SIZE (16*1024)
- if (!s) {
- m->stat = NULL;
- return;
- }
+#define PA_MEMEXPORT_SLOTS_MAX 128
+
+#define PA_MEMIMPORT_SLOTS_MAX 128
+#define PA_MEMIMPORT_SEGMENTS_MAX 16
+
+struct pa_memimport_segment {
+ pa_memimport *import;
+ pa_shm memory;
+ unsigned n_blocks;
+};
+
+struct pa_memimport {
+ pa_mempool *pool;
+ pa_hashmap *segments;
+ pa_hashmap *blocks;
+
+ /* Called whenever an imported memory block is no longer
+ * needed. */
+ pa_memimport_release_cb_t release_cb;
+ void *userdata;
+
+ PA_LLIST_FIELDS(pa_memimport);
+};
+
+struct memexport_slot {
+ PA_LLIST_FIELDS(struct memexport_slot);
+ pa_memblock *block;
+};
+
+struct pa_memexport {
+ pa_mempool *pool;
+
+ struct memexport_slot slots[PA_MEMEXPORT_SLOTS_MAX];
+ PA_LLIST_HEAD(struct memexport_slot, free_slots);
+ PA_LLIST_HEAD(struct memexport_slot, used_slots);
+ unsigned n_init;
+
+ /* Called whenever a client from which we imported a memory block
+ which we in turn exported to another client dies and we need to
+ revoke the memory block accordingly */
+ pa_memexport_revoke_cb_t revoke_cb;
+ void *userdata;
+
+ PA_LLIST_FIELDS(pa_memexport);
+};
+
+struct mempool_slot {
+ PA_LLIST_FIELDS(struct mempool_slot);
+ /* the actual data follows immediately hereafter */
+};
- m->stat = pa_memblock_stat_ref(s);
- s->total++;
- s->allocated++;
- s->total_size += m->length;
- s->allocated_size += m->length;
+struct pa_mempool {
+ pa_shm memory;
+ size_t block_size;
+ unsigned n_blocks, n_init;
+
+ PA_LLIST_HEAD(pa_memimport, imports);
+ PA_LLIST_HEAD(pa_memexport, exports);
+
+ /* A list of free slots that may be reused */
+ PA_LLIST_HEAD(struct mempool_slot, free_slots);
+ PA_LLIST_HEAD(struct mempool_slot, used_slots);
+
+ pa_mempool_stat stat;
+};
+
+static void segment_detach(pa_memimport_segment *seg);
+
+static void stat_add(pa_memblock*b) {
+ assert(b);
+ assert(b->pool);
+
+ b->pool->stat.n_allocated ++;
+ b->pool->stat.n_accumulated ++;
+ b->pool->stat.allocated_size += b->length;
+ b->pool->stat.accumulated_size += b->length;
+
+ if (b->type == PA_MEMBLOCK_IMPORTED) {
+ b->pool->stat.n_imported++;
+ b->pool->stat.imported_size += b->length;
+ }
}
-static void stat_remove(pa_memblock *m) {
- assert(m);
+static void stat_remove(pa_memblock *b) {
+ assert(b);
+ assert(b->pool);
- if (!m->stat)
- return;
+ assert(b->pool->stat.n_allocated > 0);
+ assert(b->pool->stat.allocated_size >= b->length);
+
+ b->pool->stat.n_allocated --;
+ b->pool->stat.allocated_size -= b->length;
+
+ if (b->type == PA_MEMBLOCK_IMPORTED) {
+ assert(b->pool->stat.n_imported > 0);
+ assert(b->pool->stat.imported_size >= b->length);
+
+ b->pool->stat.n_imported --;
+ b->pool->stat.imported_size -= b->length;
+ }
+}
- m->stat->total--;
- m->stat->total_size -= m->length;
+static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length);
+
+pa_memblock *pa_memblock_new(pa_mempool *p, size_t length) {
+ pa_memblock *b;
- pa_memblock_stat_unref(m->stat);
- m->stat = NULL;
+ assert(p);
+ assert(length > 0);
+
+ if (!(b = pa_memblock_new_pool(p, length)))
+ b = memblock_new_appended(p, length);
+
+ return b;
}
-pa_memblock *pa_memblock_new(size_t length, pa_memblock_stat*s) {
- pa_memblock *b = pa_xmalloc(sizeof(pa_memblock)+length);
+static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length) {
+ pa_memblock *b;
+
+ assert(p);
+ assert(length > 0);
+
+ b = pa_xmalloc(sizeof(pa_memblock) + length);
b->type = PA_MEMBLOCK_APPENDED;
+ b->read_only = 0;
b->ref = 1;
b->length = length;
- b->data = b+1;
- b->free_cb = NULL;
- b->read_only = 0;
- stat_add(b, s);
+ b->data = (uint8_t*) b + sizeof(pa_memblock);
+ b->pool = p;
+
+ stat_add(b);
return b;
}
-pa_memblock *pa_memblock_new_dynamic(void *d, size_t length, pa_memblock_stat*s) {
- pa_memblock *b = pa_xmalloc(sizeof(pa_memblock));
- b->type = PA_MEMBLOCK_DYNAMIC;
- b->ref = 1;
+static struct mempool_slot* mempool_allocate_slot(pa_mempool *p) {
+ struct mempool_slot *slot;
+ assert(p);
+
+ if (p->free_slots) {
+ slot = p->free_slots;
+ PA_LLIST_REMOVE(struct mempool_slot, p->free_slots, slot);
+ } else if (p->n_init < p->n_blocks)
+ slot = (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (p->block_size * p->n_init++));
+ else {
+ pa_log_debug(__FILE__": Pool full");
+ p->stat.n_pool_full++;
+ return NULL;
+ }
+
+ PA_LLIST_PREPEND(struct mempool_slot, p->used_slots, slot);
+ return slot;
+}
+
+static void* mempool_slot_data(struct mempool_slot *slot) {
+ assert(slot);
+
+ return (uint8_t*) slot + sizeof(struct mempool_slot);
+}
+
+static unsigned mempool_slot_idx(pa_mempool *p, void *ptr) {
+ assert(p);
+ assert((uint8_t*) ptr >= (uint8_t*) p->memory.ptr);
+ assert((uint8_t*) ptr < (uint8_t*) p->memory.ptr + p->memory.size);
+
+ return ((uint8_t*) ptr - (uint8_t*) p->memory.ptr) / p->block_size;
+}
+
+static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {
+ unsigned idx;
+
+ if ((idx = mempool_slot_idx(p, ptr)) == (unsigned) -1)
+ return NULL;
+
+ return (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (idx * p->block_size));
+}
+
+pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
+ pa_memblock *b = NULL;
+ struct mempool_slot *slot;
+
+ assert(p);
+ assert(length > 0);
+
+ if (p->block_size - sizeof(struct mempool_slot) >= sizeof(pa_memblock) + length) {
+
+ if (!(slot = mempool_allocate_slot(p)))
+ return NULL;
+
+ b = mempool_slot_data(slot);
+ b->type = PA_MEMBLOCK_POOL;
+ b->data = (uint8_t*) b + sizeof(pa_memblock);
+
+ } else if (p->block_size - sizeof(struct mempool_slot) >= length) {
+
+ if (!(slot = mempool_allocate_slot(p)))
+ return NULL;
+
+ b = pa_xnew(pa_memblock, 1);
+ b->type = PA_MEMBLOCK_POOL_EXTERNAL;
+ b->data = mempool_slot_data(slot);
+ } else {
+ pa_log_debug(__FILE__": Memory block to large for pool: %u > %u", length, p->block_size - sizeof(struct mempool_slot));
+ p->stat.n_too_large_for_pool++;
+ return NULL;
+ }
+
b->length = length;
- b->data = d;
- b->free_cb = NULL;
b->read_only = 0;
- stat_add(b, s);
+ b->ref = 1;
+ b->pool = p;
+
+ stat_add(b);
return b;
}
-pa_memblock *pa_memblock_new_fixed(void *d, size_t length, int read_only, pa_memblock_stat*s) {
- pa_memblock *b = pa_xmalloc(sizeof(pa_memblock));
+pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, int read_only) {
+ pa_memblock *b;
+
+ assert(p);
+ assert(d);
+ assert(length > 0);
+
+ b = pa_xnew(pa_memblock, 1);
b->type = PA_MEMBLOCK_FIXED;
+ b->read_only = read_only;
b->ref = 1;
b->length = length;
b->data = d;
- b->free_cb = NULL;
- b->read_only = read_only;
- stat_add(b, s);
+ b->pool = p;
+
+ stat_add(b);
return b;
}
-pa_memblock *pa_memblock_new_user(void *d, size_t length, void (*free_cb)(void *p), int read_only, pa_memblock_stat*s) {
+pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, void (*free_cb)(void *p), int read_only) {
pa_memblock *b;
- assert(d && length && free_cb);
- b = pa_xmalloc(sizeof(pa_memblock));
+
+ assert(p);
+ assert(d);
+ assert(length > 0);
+ assert(free_cb);
+
+ b = pa_xnew(pa_memblock, 1);
b->type = PA_MEMBLOCK_USER;
+ b->read_only = read_only;
b->ref = 1;
b->length = length;
b->data = d;
- b->free_cb = free_cb;
- b->read_only = read_only;
- stat_add(b, s);
+ b->per_type.user.free_cb = free_cb;
+ b->pool = p;
+
+ stat_add(b);
return b;
}
@@ -122,52 +307,458 @@ void pa_memblock_unref(pa_memblock*b) {
assert(b);
assert(b->ref >= 1);
- if ((--(b->ref)) == 0) {
- stat_remove(b);
+ if ((--(b->ref)) > 0)
+ return;
+
+ stat_remove(b);
+
+ switch (b->type) {
+ case PA_MEMBLOCK_USER :
+ assert(b->per_type.user.free_cb);
+ b->per_type.user.free_cb(b->data);
+
+ /* Fall through */
+
+ case PA_MEMBLOCK_FIXED:
+ case PA_MEMBLOCK_APPENDED :
+ pa_xfree(b);
+ break;
+
+ case PA_MEMBLOCK_IMPORTED : {
+ pa_memimport_segment *segment;
+
+ segment = b->per_type.imported.segment;
+ assert(segment);
+ assert(segment->import);
+
+ pa_hashmap_remove(segment->import->blocks, PA_UINT32_TO_PTR(b->per_type.imported.id));
+ segment->import->release_cb(segment->import, b->per_type.imported.id, segment->import->userdata);
+
+ if (-- segment->n_blocks <= 0)
+ segment_detach(segment);
+
+ pa_xfree(b);
+ break;
+ }
- if (b->type == PA_MEMBLOCK_USER) {
- assert(b->free_cb);
- b->free_cb(b->data);
- } else if (b->type == PA_MEMBLOCK_DYNAMIC)
- pa_xfree(b->data);
+ case PA_MEMBLOCK_POOL_EXTERNAL:
+ case PA_MEMBLOCK_POOL: {
+ struct mempool_slot *slot;
- pa_xfree(b);
+ slot = mempool_slot_by_ptr(b->pool, b->data);
+ assert(slot);
+
+ PA_LLIST_REMOVE(struct mempool_slot, b->pool->used_slots, slot);
+ PA_LLIST_PREPEND(struct mempool_slot, b->pool->free_slots, slot);
+
+ if (b->type == PA_MEMBLOCK_POOL_EXTERNAL)
+ pa_xfree(b);
+ }
}
}
+static void memblock_make_local(pa_memblock *b) {
+ assert(b);
+
+ if (b->length <= b->pool->block_size - sizeof(struct mempool_slot)) {
+ struct mempool_slot *slot;
+
+ if ((slot = mempool_allocate_slot(b->pool))) {
+ void *new_data;
+ /* We can move it into a local pool, perfect! */
+
+ b->type = PA_MEMBLOCK_POOL_EXTERNAL;
+ b->read_only = 0;
+
+ new_data = mempool_slot_data(slot);
+ memcpy(new_data, b->data, b->length);
+ b->data = new_data;
+ return;
+ }
+ }
+
+ /* Humm, not enough space in the pool, so lets allocate the memory with malloc() */
+ b->type = PA_MEMBLOCK_USER;
+ b->per_type.user.free_cb = pa_xfree;
+ b->read_only = 0;
+ b->data = pa_xmemdup(b->data, b->length);
+}
+
void pa_memblock_unref_fixed(pa_memblock *b) {
- assert(b && b->ref >= 1 && b->type == PA_MEMBLOCK_FIXED);
+ assert(b);
+ assert(b->ref >= 1);
+ assert(b->type == PA_MEMBLOCK_FIXED);
- if (b->ref == 1)
- pa_memblock_unref(b);
- else {
- b->data = pa_xmemdup(b->data, b->length);
- b->type = PA_MEMBLOCK_DYNAMIC;
- b->ref--;
+ if (b->ref > 1)
+ memblock_make_local(b);
+
+ pa_memblock_unref(b);
+}
+
+static void memblock_replace_import(pa_memblock *b) {
+ pa_memimport_segment *seg;
+
+ assert(b);
+ assert(b->type == PA_MEMBLOCK_IMPORTED);
+
+ assert(b->pool->stat.n_imported > 0);
+ assert(b->pool->stat.imported_size >= b->length);
+ b->pool->stat.n_imported --;
+ b->pool->stat.imported_size -= b->length;
+
+ seg = b->per_type.imported.segment;
+ assert(seg);
+ assert(seg->import);
+
+ pa_hashmap_remove(
+ seg->import->blocks,
+ PA_UINT32_TO_PTR(b->per_type.imported.id));
+
+ memblock_make_local(b);
+
+ if (-- seg->n_blocks <= 0)
+ segment_detach(seg);
+}
+
+pa_mempool* pa_mempool_new(int shared) {
+ size_t ps;
+ pa_mempool *p;
+
+ p = pa_xnew(pa_mempool, 1);
+
+ ps = (size_t) sysconf(_SC_PAGESIZE);
+
+ p->block_size = (PA_MEMPOOL_SLOT_SIZE/ps)*ps;
+
+ if (p->block_size < ps)
+ p->block_size = ps;
+
+ p->n_blocks = PA_MEMPOOL_SLOTS_MAX;
+
+ assert(p->block_size > sizeof(struct mempool_slot));
+
+ if (pa_shm_create_rw(&p->memory, p->n_blocks * p->block_size, shared, 0700) < 0) {
+ pa_xfree(p);
+ return NULL;
+ }
+
+ p->n_init = 0;
+
+ PA_LLIST_HEAD_INIT(pa_memimport, p->imports);
+ PA_LLIST_HEAD_INIT(pa_memexport, p->exports);
+ PA_LLIST_HEAD_INIT(struct mempool_slot, p->free_slots);
+ PA_LLIST_HEAD_INIT(struct mempool_slot, p->used_slots);
+
+ memset(&p->stat, 0, sizeof(p->stat));
+
+ return p;
+}
+
+void pa_mempool_free(pa_mempool *p) {
+ assert(p);
+
+ while (p->imports)
+ pa_memimport_free(p->imports);
+
+ while (p->exports)
+ pa_memexport_free(p->exports);
+
+ if (p->stat.n_allocated > 0)
+ pa_log_warn(__FILE__": WARNING! Memory pool destroyed but not all memory blocks freed!");
+
+ pa_shm_free(&p->memory);
+ pa_xfree(p);
+}
+
+const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p) {
+ assert(p);
+
+ return &p->stat;
+}
+
+void pa_mempool_vacuum(pa_mempool *p) {
+ struct mempool_slot *slot;
+
+ assert(p);
+
+ for (slot = p->free_slots; slot; slot = slot->next) {
+ pa_shm_punch(&p->memory, (uint8_t*) slot + sizeof(struct mempool_slot) - (uint8_t*) p->memory.ptr, p->block_size - sizeof(struct mempool_slot));
}
}
-pa_memblock_stat* pa_memblock_stat_new(void) {
- pa_memblock_stat *s;
+int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id) {
+ assert(p);
- s = pa_xmalloc(sizeof(pa_memblock_stat));
- s->ref = 1;
- s->total = s->total_size = s->allocated = s->allocated_size = 0;
+ if (!p->memory.shared)
+ return -1;
- return s;
+ *id = p->memory.id;
+
+ return 0;
+}
+
+/* For recieving blocks from other nodes */
+pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata) {
+ pa_memimport *i;
+
+ assert(p);
+ assert(cb);
+
+ i = pa_xnew(pa_memimport, 1);
+ i->pool = p;
+ i->segments = pa_hashmap_new(NULL, NULL);
+ i->blocks = pa_hashmap_new(NULL, NULL);
+ i->release_cb = cb;
+ i->userdata = userdata;
+
+ PA_LLIST_PREPEND(pa_memimport, p->imports, i);
+ return i;
}
-void pa_memblock_stat_unref(pa_memblock_stat *s) {
- assert(s && s->ref >= 1);
+static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
+
+static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
+ pa_memimport_segment* seg;
- if (!(--(s->ref))) {
- assert(!s->total);
- pa_xfree(s);
+ if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
+ return NULL;
+
+ seg = pa_xnew(pa_memimport_segment, 1);
+
+ if (pa_shm_attach_ro(&seg->memory, shm_id) < 0) {
+ pa_xfree(seg);
+ return NULL;
}
+
+ seg->import = i;
+ seg->n_blocks = 0;
+
+ pa_hashmap_put(i->segments, PA_UINT32_TO_PTR(shm_id), seg);
+ return seg;
+}
+
+static void segment_detach(pa_memimport_segment *seg) {
+ assert(seg);
+
+ pa_hashmap_remove(seg->import->segments, PA_UINT32_TO_PTR(seg->memory.id));
+ pa_shm_free(&seg->memory);
+ pa_xfree(seg);
+}
+
+void pa_memimport_free(pa_memimport *i) {
+ pa_memexport *e;
+ pa_memblock *b;
+
+ assert(i);
+
+ /* If we've exported this block further we need to revoke that export */
+ for (e = i->pool->exports; e; e = e->next)
+ memexport_revoke_blocks(e, i);
+
+ while ((b = pa_hashmap_get_first(i->blocks)))
+ memblock_replace_import(b);
+
+ assert(pa_hashmap_size(i->segments) == 0);
+
+ pa_hashmap_free(i->blocks, NULL, NULL);
+ pa_hashmap_free(i->segments, NULL, NULL);
+
+ PA_LLIST_REMOVE(pa_memimport, i->pool->imports, i);
+ pa_xfree(i);
}
-pa_memblock_stat * pa_memblock_stat_ref(pa_memblock_stat *s) {
- assert(s);
- s->ref++;
- return s;
+pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) {
+ pa_memblock *b;
+ pa_memimport_segment *seg;
+
+ assert(i);
+
+ if (pa_hashmap_size(i->blocks) >= PA_MEMIMPORT_SLOTS_MAX)
+ return NULL;
+
+ if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id))))
+ if (!(seg = segment_attach(i, shm_id)))
+ return NULL;
+
+ if (offset+size > seg->memory.size)
+ return NULL;
+
+ b = pa_xnew(pa_memblock, 1);
+ b->type = PA_MEMBLOCK_IMPORTED;
+ b->read_only = 1;
+ b->ref = 1;
+ b->length = size;
+ b->data = (uint8_t*) seg->memory.ptr + offset;
+ b->pool = i->pool;
+ b->per_type.imported.id = block_id;
+ b->per_type.imported.segment = seg;
+
+ pa_hashmap_put(i->blocks, PA_UINT32_TO_PTR(block_id), b);
+
+ seg->n_blocks++;
+
+ stat_add(b);
+
+ return b;
+}
+
+int pa_memimport_process_revoke(pa_memimport *i, uint32_t id) {
+ pa_memblock *b;
+ assert(i);
+
+ if (!(b = pa_hashmap_get(i->blocks, PA_UINT32_TO_PTR(id))))
+ return -1;
+
+ memblock_replace_import(b);
+ return 0;
+}
+
+/* For sending blocks to other nodes */
+pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata) {
+ pa_memexport *e;
+
+ assert(p);
+ assert(cb);
+
+ if (!p->memory.shared)
+ return NULL;
+
+ e = pa_xnew(pa_memexport, 1);
+ e->pool = p;
+ PA_LLIST_HEAD_INIT(struct memexport_slot, e->free_slots);
+ PA_LLIST_HEAD_INIT(struct memexport_slot, e->used_slots);
+ e->n_init = 0;
+ e->revoke_cb = cb;
+ e->userdata = userdata;
+
+ PA_LLIST_PREPEND(pa_memexport, p->exports, e);
+ return e;
+}
+
+void pa_memexport_free(pa_memexport *e) {
+ assert(e);
+
+ while (e->used_slots)
+ pa_memexport_process_release(e, e->used_slots - e->slots);
+
+ PA_LLIST_REMOVE(pa_memexport, e->pool->exports, e);
+ pa_xfree(e);
+}
+
+int pa_memexport_process_release(pa_memexport *e, uint32_t id) {
+ assert(e);
+
+ if (id >= e->n_init)
+ return -1;
+
+ if (!e->slots[id].block)
+ return -1;
+
+/* pa_log("Processing release for %u", id); */
+
+ assert(e->pool->stat.n_exported > 0);
+ assert(e->pool->stat.exported_size >= e->slots[id].block->length);
+
+ e->pool->stat.n_exported --;
+ e->pool->stat.exported_size -= e->slots[id].block->length;
+
+ pa_memblock_unref(e->slots[id].block);
+ e->slots[id].block = NULL;
+
+ PA_LLIST_REMOVE(struct memexport_slot, e->used_slots, &e->slots[id]);
+ PA_LLIST_PREPEND(struct memexport_slot, e->free_slots, &e->slots[id]);
+
+ return 0;
+}
+
+static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i) {
+ struct memexport_slot *slot, *next;
+ assert(e);
+ assert(i);
+
+ for (slot = e->used_slots; slot; slot = next) {
+ uint32_t idx;
+ next = slot->next;
+
+ if (slot->block->type != PA_MEMBLOCK_IMPORTED ||
+ slot->block->per_type.imported.segment->import != i)
+ continue;
+
+ idx = slot - e->slots;
+ e->revoke_cb(e, idx, e->userdata);
+ pa_memexport_process_release(e, idx);
+ }
+}
+
+static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) {
+ pa_memblock *n;
+
+ assert(p);
+ assert(b);
+
+ if (b->type == PA_MEMBLOCK_IMPORTED ||
+ b->type == PA_MEMBLOCK_POOL ||
+ b->type == PA_MEMBLOCK_POOL_EXTERNAL) {
+ assert(b->pool == p);
+ return pa_memblock_ref(b);
+ }
+
+ if (!(n = pa_memblock_new_pool(p, b->length)))
+ return NULL;
+
+ memcpy(n->data, b->data, b->length);
+ return n;
+}
+
+int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) {
+ pa_shm *memory;
+ struct memexport_slot *slot;
+
+ assert(e);
+ assert(b);
+ assert(block_id);
+ assert(shm_id);
+ assert(offset);
+ assert(size);
+ assert(b->pool == e->pool);
+
+ if (!(b = memblock_shared_copy(e->pool, b)))
+ return -1;
+
+ if (e->free_slots) {
+ slot = e->free_slots;
+ PA_LLIST_REMOVE(struct memexport_slot, e->free_slots, slot);
+ } else if (e->n_init < PA_MEMEXPORT_SLOTS_MAX) {
+ slot = &e->slots[e->n_init++];
+ } else {
+ pa_memblock_unref(b);
+ return -1;
+ }
+
+ PA_LLIST_PREPEND(struct memexport_slot, e->used_slots, slot);
+ slot->block = b;
+ *block_id = slot - e->slots;
+
+/* pa_log("Got block id %u", *block_id); */
+
+ if (b->type == PA_MEMBLOCK_IMPORTED) {
+ assert(b->per_type.imported.segment);
+ memory = &b->per_type.imported.segment->memory;
+ } else {
+ assert(b->type == PA_MEMBLOCK_POOL || b->type == PA_MEMBLOCK_POOL_EXTERNAL);
+ assert(b->pool);
+ memory = &b->pool->memory;
+ }
+
+ assert(b->data >= memory->ptr);
+ assert((uint8_t*) b->data + b->length <= (uint8_t*) memory->ptr + memory->size);
+
+ *shm_id = memory->id;
+ *offset = (uint8_t*) b->data - (uint8_t*) memory->ptr;
+ *size = b->length;
+
+ e->pool->stat.n_exported ++;
+ e->pool->stat.exported_size += b->length;
+
+ return 0;
}
diff --git a/src/pulsecore/memblock.h b/src/pulsecore/memblock.h
index 04a0b55b..e63e1e0f 100644
--- a/src/pulsecore/memblock.h
+++ b/src/pulsecore/memblock.h
@@ -1,5 +1,5 @@
-#ifndef foomemblockhfoo
-#define foomemblockhfoo
+#ifndef foopulsememblockhfoo
+#define foopulsememblockhfoo
/* $Id$ */
@@ -25,6 +25,8 @@
#include <sys/types.h>
#include <inttypes.h>
+#include <pulsecore/llist.h>
+
/* A pa_memblock is a reference counted memory block. PulseAudio
* passed references to pa_memblocks around instead of copying
* data. See pa_memchunk for a structure that describes parts of
@@ -32,43 +34,72 @@
/* The type of memory this block points to */
typedef enum pa_memblock_type {
- PA_MEMBLOCK_FIXED, /* data is a pointer to fixed memory that needs not to be freed */
- PA_MEMBLOCK_APPENDED, /* The most common kind: the data is appended to the memory block */
- PA_MEMBLOCK_DYNAMIC, /* data is a pointer to some memory allocated with pa_xmalloc() */
- PA_MEMBLOCK_USER /* User supplied memory, to be freed with free_cb */
+ PA_MEMBLOCK_POOL, /* Memory is part of the memory pool */
+ PA_MEMBLOCK_POOL_EXTERNAL, /* Data memory is part of the memory pool but the pa_memblock structure itself not */
+ PA_MEMBLOCK_APPENDED, /* the data is appended to the memory block */
+ PA_MEMBLOCK_USER, /* User supplied memory, to be freed with free_cb */
+ PA_MEMBLOCK_FIXED, /* data is a pointer to fixed memory that needs not to be freed */
+ PA_MEMBLOCK_IMPORTED, /* Memory is imported from another process via shm */
} pa_memblock_type_t;
-/* A structure of keeping memory block statistics */
-/* Maintains statistics about memory blocks */
-typedef struct pa_memblock_stat {
- int ref;
- unsigned total;
- unsigned total_size;
- unsigned allocated;
- unsigned allocated_size;
-} pa_memblock_stat;
-
-typedef struct pa_memblock {
+typedef struct pa_memblock pa_memblock;
+typedef struct pa_mempool pa_mempool;
+typedef struct pa_mempool_stat pa_mempool_stat;
+typedef struct pa_memimport_segment pa_memimport_segment;
+typedef struct pa_memimport pa_memimport;
+typedef struct pa_memexport pa_memexport;
+
+typedef void (*pa_memimport_release_cb_t)(pa_memimport *i, uint32_t block_id, void *userdata);
+typedef void (*pa_memexport_revoke_cb_t)(pa_memexport *e, uint32_t block_id, void *userdata);
+
+struct pa_memblock {
pa_memblock_type_t type;
- unsigned ref; /* the reference counter */
int read_only; /* boolean */
+ unsigned ref; /* the reference counter */
size_t length;
void *data;
- void (*free_cb)(void *p); /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */
- pa_memblock_stat *stat;
-} pa_memblock;
+ pa_mempool *pool;
-/* Allocate a new memory block of type PA_MEMBLOCK_APPENDED */
-pa_memblock *pa_memblock_new(size_t length, pa_memblock_stat*s);
+ union {
+ struct {
+ void (*free_cb)(void *p); /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */
+ } user;
+
+ struct {
+ uint32_t id;
+ pa_memimport_segment *segment;
+ } imported;
+ } per_type;
+};
-/* Allocate a new memory block of type PA_MEMBLOCK_DYNAMIC. The pointer data is to be maintained be the memory block */
-pa_memblock *pa_memblock_new_dynamic(void *data, size_t length, pa_memblock_stat*s);
+struct pa_mempool_stat {
+ unsigned n_allocated;
+ unsigned n_accumulated;
+ unsigned n_imported;
+ unsigned n_exported;
+ size_t allocated_size;
+ size_t accumulated_size;
+ size_t imported_size;
+ size_t exported_size;
-/* Allocate a new memory block of type PA_MEMBLOCK_FIXED */
-pa_memblock *pa_memblock_new_fixed(void *data, size_t length, int read_only, pa_memblock_stat*s);
+ unsigned n_too_large_for_pool;
+ unsigned n_pool_full;
+};
+
+/* Allocate a new memory block of type PA_MEMBLOCK_MEMPOOL or PA_MEMBLOCK_APPENDED, depending on the size */
+pa_memblock *pa_memblock_new(pa_mempool *, size_t length);
+
+/* Allocate a new memory block of type PA_MEMBLOCK_MEMPOOL. If the requested size is too large, return NULL */
+pa_memblock *pa_memblock_new_pool(pa_mempool *, size_t length);
/* Allocate a new memory block of type PA_MEMBLOCK_USER */
-pa_memblock *pa_memblock_new_user(void *data, size_t length, void (*free_cb)(void *p), int read_only, pa_memblock_stat*s);
+pa_memblock *pa_memblock_new_user(pa_mempool *, void *data, size_t length, void (*free_cb)(void *p), int read_only);
+
+/* A special case of pa_memblock_new_user: take a memory buffer previously allocated with pa_xmalloc() */
+#define pa_memblock_new_malloced(p,data,length) pa_memblock_new_user(p, data, length, pa_xfree, 0)
+
+/* Allocate a new memory block of type PA_MEMBLOCK_FIXED */
+pa_memblock *pa_memblock_new_fixed(pa_mempool *, void *data, size_t length, int read_only);
void pa_memblock_unref(pa_memblock*b);
pa_memblock* pa_memblock_ref(pa_memblock*b);
@@ -79,8 +110,23 @@ references to the memory. This causes the memory to be copied and
converted into a PA_MEMBLOCK_DYNAMIC type memory block */
void pa_memblock_unref_fixed(pa_memblock*b);
-pa_memblock_stat* pa_memblock_stat_new(void);
-void pa_memblock_stat_unref(pa_memblock_stat *s);
-pa_memblock_stat * pa_memblock_stat_ref(pa_memblock_stat *s);
+/* The memory block manager */
+pa_mempool* pa_mempool_new(int shared);
+void pa_mempool_free(pa_mempool *p);
+const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p);
+void pa_mempool_vacuum(pa_mempool *p);
+int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id);
+
+/* For recieving blocks from other nodes */
+pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata);
+void pa_memimport_free(pa_memimport *i);
+pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size);
+int pa_memimport_process_revoke(pa_memimport *i, uint32_t block_id);
+
+/* For sending blocks to other nodes */
+pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void *userdata);
+void pa_memexport_free(pa_memexport *e);
+int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t *size);
+int pa_memexport_process_release(pa_memexport *e, uint32_t id);
#endif
diff --git a/src/pulsecore/memblockq.c b/src/pulsecore/memblockq.c
index 822bd66c..2fd38850 100644
--- a/src/pulsecore/memblockq.c
+++ b/src/pulsecore/memblockq.c
@@ -49,7 +49,6 @@ struct pa_memblockq {
size_t maxlength, tlength, base, prebuf, minreq;
int64_t read_index, write_index;
enum { PREBUF, RUNNING } state;
- pa_memblock_stat *memblock_stat;
pa_memblock *silence;
pa_mcalign *mcalign;
};
@@ -61,8 +60,7 @@ pa_memblockq* pa_memblockq_new(
size_t base,
size_t prebuf,
size_t minreq,
- pa_memblock *silence,
- pa_memblock_stat *s) {
+ pa_memblock *silence) {
pa_memblockq* bq;
@@ -75,7 +73,6 @@ pa_memblockq* pa_memblockq_new(
bq->base = base;
bq->read_index = bq->write_index = idx;
- bq->memblock_stat = s;
pa_log_debug(__FILE__": memblockq requested: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu",
(unsigned long)maxlength, (unsigned long)tlength, (unsigned long)base, (unsigned long)prebuf, (unsigned long)minreq);
@@ -586,7 +583,7 @@ int pa_memblockq_push_align(pa_memblockq* bq, const pa_memchunk *chunk) {
return pa_memblockq_push(bq, chunk);
if (!bq->mcalign)
- bq->mcalign = pa_mcalign_new(bq->base, bq->memblock_stat);
+ bq->mcalign = pa_mcalign_new(bq->base);
if (!can_push(bq, pa_mcalign_csize(bq->mcalign, chunk->length)))
return -1;
diff --git a/src/pulsecore/memblockq.h b/src/pulsecore/memblockq.h
index c35b62dd..4d701a80 100644
--- a/src/pulsecore/memblockq.h
+++ b/src/pulsecore/memblockq.h
@@ -69,8 +69,7 @@ pa_memblockq* pa_memblockq_new(
size_t base,
size_t prebuf,
size_t minreq,
- pa_memblock *silence,
- pa_memblock_stat *s);
+ pa_memblock *silence);
void pa_memblockq_free(pa_memblockq*bq);
diff --git a/src/pulsecore/memchunk.c b/src/pulsecore/memchunk.c
index abfc2cab..bcf0ce04 100644
--- a/src/pulsecore/memchunk.c
+++ b/src/pulsecore/memchunk.c
@@ -32,10 +32,13 @@
#include "memchunk.h"
-void pa_memchunk_make_writable(pa_memchunk *c, pa_memblock_stat *s, size_t min) {
+void pa_memchunk_make_writable(pa_memchunk *c, size_t min) {
pa_memblock *n;
size_t l;
- assert(c && c->memblock && c->memblock->ref >= 1);
+
+ assert(c);
+ assert(c->memblock);
+ assert(c->memblock->ref >= 1);
if (c->memblock->ref == 1 && !c->memblock->read_only && c->memblock->length >= c->index+min)
return;
@@ -44,7 +47,7 @@ void pa_memchunk_make_writable(pa_memchunk *c, pa_memblock_stat *s, size_t min)
if (l < min)
l = min;
- n = pa_memblock_new(l, s);
+ n = pa_memblock_new(c->memblock->pool, l);
memcpy(n->data, (uint8_t*) c->memblock->data + c->index, c->length);
pa_memblock_unref(c->memblock);
c->memblock = n;
diff --git a/src/pulsecore/memchunk.h b/src/pulsecore/memchunk.h
index 1b26c2e6..b8ce6249 100644
--- a/src/pulsecore/memchunk.h
+++ b/src/pulsecore/memchunk.h
@@ -36,7 +36,7 @@ typedef struct pa_memchunk {
/* Make a memchunk writable, i.e. make sure that the caller may have
* exclusive access to the memblock and it is not read_only. If needed
* the memblock in the structure is replaced by a copy. */
-void pa_memchunk_make_writable(pa_memchunk *c, pa_memblock_stat *s, size_t min);
+void pa_memchunk_make_writable(pa_memchunk *c, size_t min);
/* Invalidate a memchunk. This does not free the cotaining memblock,
* but sets all members to zero. */
diff --git a/src/pulsecore/protocol-esound.c b/src/pulsecore/protocol-esound.c
index f1a827bc..2fadeca3 100644
--- a/src/pulsecore/protocol-esound.c
+++ b/src/pulsecore/protocol-esound.c
@@ -377,8 +377,7 @@ static int esd_proto_stream_play(struct connection *c, PA_GCC_UNUSED esd_proto_t
pa_frame_size(&ss),
(size_t) -1,
l/PLAYBACK_BUFFER_FRAGMENTS,
- NULL,
- c->protocol->core->memblock_stat);
+ NULL);
pa_iochannel_socket_set_rcvbuf(c->io, l/PLAYBACK_BUFFER_FRAGMENTS*2);
c->playback.fragment_size = l/10;
@@ -469,8 +468,7 @@ static int esd_proto_stream_record(struct connection *c, esd_proto_t request, co
pa_frame_size(&ss),
1,
0,
- NULL,
- c->protocol->core->memblock_stat);
+ NULL);
pa_iochannel_socket_set_sndbuf(c->io, l/RECORD_BUFFER_FRAGMENTS*2);
c->source_output->push = source_output_push_cb;
@@ -722,7 +720,7 @@ static int esd_proto_sample_cache(struct connection *c, PA_GCC_UNUSED esd_proto_
CHECK_VALIDITY(pa_utf8_valid(name), "Invalid UTF8 in sample name.");
assert(!c->scache.memchunk.memblock);
- c->scache.memchunk.memblock = pa_memblock_new(sc_length, c->protocol->core->memblock_stat);
+ c->scache.memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, sc_length);
c->scache.memchunk.index = 0;
c->scache.memchunk.length = sc_length;
c->scache.sample_spec = ss;
@@ -941,7 +939,7 @@ static int do_read(struct connection *c) {
}
if (!c->playback.current_memblock) {
- c->playback.current_memblock = pa_memblock_new(c->playback.fragment_size*2, c->protocol->core->memblock_stat);
+ c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2);
assert(c->playback.current_memblock && c->playback.current_memblock->length >= l);
c->playback.memblock_index = 0;
}
diff --git a/src/pulsecore/protocol-native.c b/src/pulsecore/protocol-native.c
index 0b79892c..2c9b3566 100644
--- a/src/pulsecore/protocol-native.c
+++ b/src/pulsecore/protocol-native.c
@@ -348,8 +348,7 @@ static struct record_stream* record_stream_new(
base = pa_frame_size(ss),
1,
0,
- NULL,
- c->protocol->core->memblock_stat);
+ NULL);
assert(s->memblockq);
s->fragment_size = (fragment_size/base)*base;
@@ -448,7 +447,7 @@ static struct playback_stream* playback_stream_new(
start_index = 0;
}
- silence = pa_silence_memblock_new(ss, 0, c->protocol->core->memblock_stat);
+ silence = pa_silence_memblock_new(c->protocol->core->mempool, ss, 0);
s->memblockq = pa_memblockq_new(
start_index,
@@ -457,8 +456,7 @@ static struct playback_stream* playback_stream_new(
pa_frame_size(ss),
prebuf,
minreq,
- silence,
- c->protocol->core->memblock_stat);
+ silence);
pa_memblock_unref(silence);
@@ -1076,6 +1074,7 @@ static void command_drain_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC
static void command_stat(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
struct connection *c = userdata;
pa_tagstruct *reply;
+ const pa_mempool_stat *stat;
assert(c && t);
if (!pa_tagstruct_eof(t)) {
@@ -1085,11 +1084,13 @@ static void command_stat(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
CHECK_VALIDITY(c->pstream, c->authorized, tag, PA_ERR_ACCESS);
+ stat = pa_mempool_get_stat(c->protocol->core->mempool);
+
reply = reply_new(tag);
- pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->total);
- pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->total_size);
- pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->allocated);
- pa_tagstruct_putu32(reply, c->protocol->core->memblock_stat->allocated_size);
+ pa_tagstruct_putu32(reply, stat->n_allocated);
+ pa_tagstruct_putu32(reply, stat->allocated_size);
+ pa_tagstruct_putu32(reply, stat->n_accumulated);
+ pa_tagstruct_putu32(reply, stat->accumulated_size);
pa_tagstruct_putu32(reply, pa_scache_total_size(c->protocol->core));
pa_pstream_send_tagstruct(c->pstream, reply);
}
@@ -2256,7 +2257,7 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o
pa_memblock_ref(u->memchunk.memblock);
u->length = 0;
} else {
- u->memchunk.memblock = pa_memblock_new(u->length, c->protocol->core->memblock_stat);
+ u->memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, u->length);
u->memchunk.index = u->memchunk.length = 0;
}
}
@@ -2349,9 +2350,11 @@ static void on_connection(PA_GCC_UNUSED pa_socket_server*s, pa_iochannel *io, vo
c->client->userdata = c;
c->client->owner = p->module;
- c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->memblock_stat);
+ c->pstream = pa_pstream_new(p->core->mainloop, io, p->core->mempool);
assert(c->pstream);
+ pa_pstream_use_shm(c->pstream, 1);
+
pa_pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
pa_pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
pa_pstream_set_die_callback(c->pstream, pstream_die_callback, c);
diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c
index 3705986d..924ee29e 100644
--- a/src/pulsecore/protocol-simple.c
+++ b/src/pulsecore/protocol-simple.c
@@ -128,7 +128,7 @@ static int do_read(struct connection *c) {
}
if (!c->playback.current_memblock) {
- c->playback.current_memblock = pa_memblock_new(c->playback.fragment_size*2, c->protocol->core->memblock_stat);
+ c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2);
assert(c->playback.current_memblock && c->playback.current_memblock->length >= l);
c->playback.memblock_index = 0;
}
@@ -369,8 +369,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
pa_frame_size(&p->sample_spec),
(size_t) -1,
l/PLAYBACK_BUFFER_FRAGMENTS,
- NULL,
- p->core->memblock_stat);
+ NULL);
assert(c->input_memblockq);
pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5);
c->playback.fragment_size = l/10;
@@ -406,8 +405,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
pa_frame_size(&p->sample_spec),
1,
0,
- NULL,
- p->core->memblock_stat);
+ NULL);
pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2);
pa_source_notify(c->source_output->source);
}
diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 7096d65a..421f5de9 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -49,28 +49,45 @@
#include "pstream.h"
+/* We piggyback information if audio data blocks are stored in SHM on the seek mode */
+#define PA_FLAG_SHMDATA 0x80000000LU
+#define PA_FLAG_SHMRELEASE 0x40000000LU
+#define PA_FLAG_SHMREVOKE 0xC0000000LU
+#define PA_FLAG_SHMMASK 0xFF000000LU
+#define PA_FLAG_SEEKMASK 0x000000FFLU
+
+/* The sequence descriptor header consists of 5 32bit integers: */
enum {
PA_PSTREAM_DESCRIPTOR_LENGTH,
PA_PSTREAM_DESCRIPTOR_CHANNEL,
PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
- PA_PSTREAM_DESCRIPTOR_SEEK,
+ PA_PSTREAM_DESCRIPTOR_FLAGS,
PA_PSTREAM_DESCRIPTOR_MAX
};
+/* If we have an SHM block, this info follows the descriptor */
+enum {
+ PA_PSTREAM_SHM_BLOCKID,
+ PA_PSTREAM_SHM_SHMID,
+ PA_PSTREAM_SHM_INDEX,
+ PA_PSTREAM_SHM_LENGTH,
+ PA_PSTREAM_SHM_MAX
+};
+
typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
#define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
#define FRAME_SIZE_MAX PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
struct item_info {
- enum { PA_PSTREAM_ITEM_PACKET, PA_PSTREAM_ITEM_MEMBLOCK } type;
+ enum {
+ PA_PSTREAM_ITEM_PACKET,
+ PA_PSTREAM_ITEM_MEMBLOCK,
+ PA_PSTREAM_ITEM_SHMRELEASE,
+ PA_PSTREAM_ITEM_SHMREVOKE
+ } type;
- /* memblock info */
- pa_memchunk chunk;
- uint32_t channel;
- int64_t offset;
- pa_seek_mode_t seek_mode;
/* packet info */
pa_packet *packet;
@@ -78,6 +95,15 @@ struct item_info {
int with_creds;
pa_creds creds;
#endif
+
+ /* memblock info */
+ pa_memchunk chunk;
+ uint32_t channel;
+ int64_t offset;
+ pa_seek_mode_t seek_mode;
+
+ /* release/revoke info */
+ uint32_t block_id;
};
struct pa_pstream {
@@ -91,20 +117,26 @@ struct pa_pstream {
int dead;
struct {
- struct item_info* current;
pa_pstream_descriptor descriptor;
+ struct item_info* current;
+ uint32_t shm_info[PA_PSTREAM_SHM_MAX];
void *data;
size_t index;
} write;
struct {
+ pa_pstream_descriptor descriptor;
pa_memblock *memblock;
pa_packet *packet;
- pa_pstream_descriptor descriptor;
+ uint32_t shm_info[PA_PSTREAM_SHM_MAX];
void *data;
size_t index;
} read;
+ int use_shm;
+ pa_memimport *import;
+ pa_memexport *export;
+
pa_pstream_packet_cb_t recieve_packet_callback;
void *recieve_packet_callback_userdata;
@@ -117,7 +149,7 @@ struct pa_pstream {
pa_pstream_notify_cb_t die_callback;
void *die_callback_userdata;
- pa_memblock_stat *memblock_stat;
+ pa_mempool *mempool;
#ifdef HAVE_CREDS
pa_creds read_creds, write_creds;
@@ -178,16 +210,19 @@ static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata)
do_something(p);
}
-pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_stat *s) {
+static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
+
+pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
pa_pstream *p;
+
+ assert(m);
assert(io);
+ assert(pool);
p = pa_xnew(pa_pstream, 1);
-
p->ref = 1;
p->io = io;
pa_iochannel_set_callback(io, io_callback, p);
-
p->dead = 0;
p->mainloop = m;
@@ -199,24 +234,24 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_sta
p->write.current = NULL;
p->write.index = 0;
-
p->read.memblock = NULL;
p->read.packet = NULL;
p->read.index = 0;
p->recieve_packet_callback = NULL;
p->recieve_packet_callback_userdata = NULL;
-
p->recieve_memblock_callback = NULL;
p->recieve_memblock_callback_userdata = NULL;
-
p->drain_callback = NULL;
p->drain_callback_userdata = NULL;
-
p->die_callback = NULL;
p->die_callback_userdata = NULL;
- p->memblock_stat = s;
+ p->mempool = pool;
+
+ p->use_shm = 0;
+ p->export = NULL;
+ p->import = NULL;
pa_iochannel_socket_set_rcvbuf(io, 1024*8);
pa_iochannel_socket_set_sndbuf(io, 1024*8);
@@ -235,8 +270,7 @@ static void item_free(void *item, PA_GCC_UNUSED void *p) {
if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
assert(i->chunk.memblock);
pa_memblock_unref(i->chunk.memblock);
- } else {
- assert(i->type == PA_PSTREAM_ITEM_PACKET);
+ } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
assert(i->packet);
pa_packet_unref(i->packet);
}
@@ -265,16 +299,18 @@ static void pstream_free(pa_pstream *p) {
void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
struct item_info *i;
- assert(p && packet && p->ref >= 1);
+
+ assert(p);
+ assert(p->ref >= 1);
+ assert(packet);
if (p->dead)
return;
-/* pa_log(__FILE__": push-packet %p", packet); */
-
i = pa_xnew(struct item_info, 1);
i->type = PA_PSTREAM_ITEM_PACKET;
i->packet = pa_packet_ref(packet);
+
#ifdef HAVE_CREDS
if ((i->with_creds = !!creds))
i->creds = *creds;
@@ -286,13 +322,15 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *cre
void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
struct item_info *i;
- assert(p && channel != (uint32_t) -1 && chunk && p->ref >= 1);
+
+ assert(p);
+ assert(p->ref >= 1);
+ assert(channel != (uint32_t) -1);
+ assert(chunk);
if (p->dead)
return;
-
-/* pa_log(__FILE__": push-memblock %p", chunk); */
-
+
i = pa_xnew(struct item_info, 1);
i->type = PA_PSTREAM_ITEM_MEMBLOCK;
i->chunk = *chunk;
@@ -309,6 +347,52 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa
p->mainloop->defer_enable(p->defer_event, 1);
}
+static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
+ struct item_info *item;
+ pa_pstream *p = userdata;
+
+ assert(p);
+ assert(p->ref >= 1);
+
+ if (p->dead)
+ return;
+
+/* pa_log(__FILE__": Releasing block %u", block_id); */
+
+ item = pa_xnew(struct item_info, 1);
+ item->type = PA_PSTREAM_ITEM_SHMRELEASE;
+ item->block_id = block_id;
+#ifdef HAVE_CREDS
+ item->with_creds = 0;
+#endif
+
+ pa_queue_push(p->send_queue, item);
+ p->mainloop->defer_enable(p->defer_event, 1);
+}
+
+static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
+ struct item_info *item;
+ pa_pstream *p = userdata;
+
+ assert(p);
+ assert(p->ref >= 1);
+
+ if (p->dead)
+ return;
+
+/* pa_log(__FILE__": Revoking block %u", block_id); */
+
+ item = pa_xnew(struct item_info, 1);
+ item->type = PA_PSTREAM_ITEM_SHMREVOKE;
+ item->block_id = block_id;
+#ifdef HAVE_CREDS
+ item->with_creds = 0;
+#endif
+
+ pa_queue_push(p->send_queue, item);
+ p->mainloop->defer_enable(p->defer_event, 1);
+}
+
static void prepare_next_write_item(pa_pstream *p) {
assert(p);
@@ -316,27 +400,77 @@ static void prepare_next_write_item(pa_pstream *p) {
return;
p->write.index = 0;
+ p->write.data = NULL;
+
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
- /*pa_log(__FILE__": pop-packet %p", p->write.current->packet);*/
assert(p->write.current->packet);
p->write.data = p->write.current->packet->data;
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
- p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
- p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
- p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
- p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = 0;
+ } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
+
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
+
+ } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
+
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
} else {
- assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock);
- p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
- p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
+ uint32_t flags;
+ int send_payload = 1;
+
+ assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
+ assert(p->write.current->chunk.memblock);
+
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
- p->write.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] = htonl(p->write.current->seek_mode);
+
+ flags = p->write.current->seek_mode & PA_FLAG_SEEKMASK;
+
+ if (p->use_shm) {
+ uint32_t block_id, shm_id;
+ size_t offset, length;
+
+ assert(p->export);
+
+ if (pa_memexport_put(p->export,
+ p->write.current->chunk.memblock,
+ &block_id,
+ &shm_id,
+ &offset,
+ &length) >= 0) {
+
+ flags |= PA_FLAG_SHMDATA;
+ send_payload = 0;
+
+ p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
+ p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
+ p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
+ p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
+
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
+ p->write.data = p->write.shm_info;
+ }
+/* else */
+/* pa_log_warn(__FILE__": Failed to export memory block."); */
+ }
+
+ if (send_payload) {
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
+ p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
+ }
+
+ p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
}
#ifdef HAVE_CREDS
@@ -344,7 +478,6 @@ static void prepare_next_write_item(pa_pstream *p) {
p->write_creds = p->write.current->creds;
#endif
-
}
static int do_write(pa_pstream *p) {
@@ -359,16 +492,18 @@ static int do_write(pa_pstream *p) {
if (!p->write.current)
return 0;
- assert(p->write.data);
-
if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
d = (uint8_t*) p->write.descriptor + p->write.index;
l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
} else {
+ assert(p->write.data);
+
d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
}
+ assert(l > 0);
+
#ifdef HAVE_CREDS
if (p->send_creds_now) {
@@ -384,7 +519,7 @@ static int do_write(pa_pstream *p) {
p->write.index += r;
- if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE+ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
+ if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
assert(p->write.current);
item_free(p->write.current, (void *) 1);
p->write.current = NULL;
@@ -428,27 +563,87 @@ static int do_read(pa_pstream *p) {
p->read.index += r;
if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
+ uint32_t flags, length, channel;
/* Reading of frame descriptor complete */
- /* Frame size too large */
- if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) > FRAME_SIZE_MAX) {
- pa_log_warn(__FILE__": Frame size too large: %lu > %lu", (unsigned long) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), (unsigned long) FRAME_SIZE_MAX);
+ flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
+
+ if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
+ pa_log_warn(__FILE__": Recieved SHM frame on a socket where SHM is disabled.");
+ return -1;
+ }
+
+ if (flags == PA_FLAG_SHMRELEASE) {
+
+ /* This is a SHM memblock release frame with no payload */
+
+/* pa_log(__FILE__": Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
+
+ assert(p->export);
+ pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
+
+ goto frame_done;
+
+ } else if (flags == PA_FLAG_SHMREVOKE) {
+
+ /* This is a SHM memblock revoke frame with no payload */
+
+/* pa_log(__FILE__": Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
+
+ assert(p->import);
+ pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
+
+ goto frame_done;
+ }
+
+ length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
+
+ if (length > FRAME_SIZE_MAX) {
+ pa_log_warn(__FILE__": Recieved invalid frame size : %lu", (unsigned long) length);
return -1;
}
assert(!p->read.packet && !p->read.memblock);
- if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]) == (uint32_t) -1) {
+ channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
+
+ if (channel == (uint32_t) -1) {
+
+ if (flags != 0) {
+ pa_log_warn(__FILE__": Received packet frame with invalid flags value.");
+ return -1;
+ }
+
/* Frame is a packet frame */
- p->read.packet = pa_packet_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]));
+ p->read.packet = pa_packet_new(length);
p->read.data = p->read.packet->data;
+
} else {
- /* Frame is a memblock frame */
- p->read.memblock = pa_memblock_new(ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]), p->memblock_stat);
- p->read.data = p->read.memblock->data;
- if (ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]) > PA_SEEK_RELATIVE_END) {
- pa_log_warn(__FILE__": Invalid seek mode");
+ if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
+ pa_log_warn(__FILE__": Received memblock frame with invalid seek mode.");
+ return -1;
+ }
+
+ if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
+
+ if (length != sizeof(p->read.shm_info)) {
+ pa_log_warn(__FILE__": Recieved SHM memblock frame with Invalid frame length.");
+ return -1;
+ }
+
+ /* Frame is a memblock frame referencing an SHM memblock */
+ p->read.data = p->read.shm_info;
+
+ } else if ((flags & PA_FLAG_SHMMASK) == 0) {
+
+ /* Frame is a memblock frame */
+
+ p->read.memblock = pa_memblock_new(p->mempool, length);
+ p->read.data = p->read.memblock->data;
+ } else {
+
+ pa_log_warn(__FILE__": Recieved memblock frame with invalid flags value.");
return -1;
}
}
@@ -456,7 +651,9 @@ static int do_read(pa_pstream *p) {
} else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
/* Frame payload available */
- if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblock data? Than pass it to the user */
+ if (p->read.memblock && p->recieve_memblock_callback) {
+
+ /* Is this memblock data? Than pass it to the user */
l = (p->read.index - r) < PA_PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
if (l > 0) {
@@ -477,13 +674,13 @@ static int do_read(pa_pstream *p) {
p,
ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
offset,
- ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK]),
+ ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
&chunk,
p->recieve_memblock_callback_userdata);
}
/* Drop seek info for following callbacks */
- p->read.descriptor[PA_PSTREAM_DESCRIPTOR_SEEK] =
+ p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
}
@@ -491,13 +688,13 @@ static int do_read(pa_pstream *p) {
/* Frame complete */
if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
+
if (p->read.memblock) {
- assert(!p->read.packet);
-
+
+ /* This was a memblock frame. We can unref the memblock now */
pa_memblock_unref(p->read.memblock);
- p->read.memblock = NULL;
- } else {
- assert(p->read.packet);
+
+ } else if (p->read.packet) {
if (p->recieve_packet_callback)
#ifdef HAVE_CREDS
@@ -507,17 +704,63 @@ static int do_read(pa_pstream *p) {
#endif
pa_packet_unref(p->read.packet);
- p->read.packet = NULL;
+ } else {
+ pa_memblock *b;
+
+ assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
+
+ assert(p->import);
+
+ if (!(b = pa_memimport_get(p->import,
+ ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
+ ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
+ ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
+ ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
+
+ pa_log_warn(__FILE__": Failed to import memory block.");
+ return -1;
+ }
+
+ if (p->recieve_memblock_callback) {
+ int64_t offset;
+ pa_memchunk chunk;
+
+ chunk.memblock = b;
+ chunk.index = 0;
+ chunk.length = b->length;
+
+ offset = (int64_t) (
+ (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
+ (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
+
+ p->recieve_memblock_callback(
+ p,
+ ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
+ offset,
+ ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
+ &chunk,
+ p->recieve_memblock_callback_userdata);
+ }
+
+ pa_memblock_unref(b);
}
- p->read.index = 0;
-#ifdef HAVE_CREDS
- p->read_creds_valid = 0;
-#endif
+ goto frame_done;
}
}
- return 0;
+ return 0;
+
+frame_done:
+ p->read.memblock = NULL;
+ p->read.packet = NULL;
+ p->read.index = 0;
+
+#ifdef HAVE_CREDS
+ p->read_creds_valid = 0;
+#endif
+
+ return 0;
}
void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
@@ -583,6 +826,16 @@ void pa_pstream_close(pa_pstream *p) {
p->dead = 1;
+ if (p->import) {
+ pa_memimport_free(p->import);
+ p->import = NULL;
+ }
+
+ if (p->export) {
+ pa_memexport_free(p->export);
+ p->export = NULL;
+ }
+
if (p->io) {
pa_iochannel_free(p->io);
p->io = NULL;
@@ -597,4 +850,19 @@ void pa_pstream_close(pa_pstream *p) {
p->drain_callback = NULL;
p->recieve_packet_callback = NULL;
p->recieve_memblock_callback = NULL;
+
+
+}
+
+void pa_pstream_use_shm(pa_pstream *p, int enable) {
+ assert(p);
+ assert(p->ref >= 1);
+
+ p->use_shm = enable;
+
+ if (!p->import)
+ p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
+
+ if (!p->export)
+ p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
}
diff --git a/src/pulsecore/pstream.h b/src/pulsecore/pstream.h
index 789e40bc..26bb7699 100644
--- a/src/pulsecore/pstream.h
+++ b/src/pulsecore/pstream.h
@@ -39,7 +39,7 @@ typedef void (*pa_pstream_packet_cb_t)(pa_pstream *p, pa_packet *packet, const p
typedef void (*pa_pstream_memblock_cb_t)(pa_pstream *p, uint32_t channel, int64_t offset, pa_seek_mode_t seek, const pa_memchunk *chunk, void *userdata);
typedef void (*pa_pstream_notify_cb_t)(pa_pstream *p, void *userdata);
-pa_pstream* pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_memblock_stat *s);
+pa_pstream* pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *p);
void pa_pstream_unref(pa_pstream*p);
pa_pstream* pa_pstream_ref(pa_pstream*p);
@@ -54,6 +54,8 @@ void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void
int pa_pstream_is_pending(pa_pstream *p);
+void pa_pstream_use_shm(pa_pstream *p, int enable);
+
void pa_pstream_close(pa_pstream *p);
#endif
diff --git a/src/pulsecore/resampler.c b/src/pulsecore/resampler.c
index 23cdf381..74226714 100644
--- a/src/pulsecore/resampler.c
+++ b/src/pulsecore/resampler.c
@@ -42,7 +42,7 @@ struct pa_resampler {
pa_sample_spec i_ss, o_ss;
pa_channel_map i_cm, o_cm;
size_t i_fz, o_fz;
- pa_memblock_stat *memblock_stat;
+ pa_mempool *mempool;
void (*impl_free)(pa_resampler *r);
void (*impl_update_input_rate)(pa_resampler *r, uint32_t rate);
@@ -71,15 +71,16 @@ static int libsamplerate_init(pa_resampler*r);
static int trivial_init(pa_resampler*r);
pa_resampler* pa_resampler_new(
- const pa_sample_spec *a,
- const pa_channel_map *am,
- const pa_sample_spec *b,
- const pa_channel_map *bm,
- pa_memblock_stat *s,
- pa_resample_method_t resample_method) {
+ pa_mempool *pool,
+ const pa_sample_spec *a,
+ const pa_channel_map *am,
+ const pa_sample_spec *b,
+ const pa_channel_map *bm,
+ pa_resample_method_t resample_method) {
pa_resampler *r = NULL;
+ assert(pool);
assert(a);
assert(b);
assert(pa_sample_spec_valid(a));
@@ -88,7 +89,7 @@ pa_resampler* pa_resampler_new(
r = pa_xnew(pa_resampler, 1);
r->impl_data = NULL;
- r->memblock_stat = s;
+ r->mempool = pool;
r->resample_method = resample_method;
r->impl_free = NULL;
@@ -450,7 +451,7 @@ static void libsamplerate_run(pa_resampler *r, const pa_memchunk *in, pa_memchun
assert(p);
/* Take the existing buffer and make it a memblock */
- out->memblock = pa_memblock_new_dynamic(*p, out->length, r->memblock_stat);
+ out->memblock = pa_memblock_new_malloced(r->mempool, *p, out->length);
*p = NULL;
}
} else {
@@ -549,7 +550,7 @@ static void trivial_run(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out
l = ((((n_frames+1) * r->o_ss.rate) / r->i_ss.rate) + 1) * fz;
out->index = 0;
- out->memblock = pa_memblock_new(l, r->memblock_stat);
+ out->memblock = pa_memblock_new(r->mempool, l);
for (o_index = 0;; o_index++, u->o_counter++) {
unsigned j;
diff --git a/src/pulsecore/resampler.h b/src/pulsecore/resampler.h
index c1199e5c..327e24a2 100644
--- a/src/pulsecore/resampler.h
+++ b/src/pulsecore/resampler.h
@@ -43,12 +43,12 @@ typedef enum pa_resample_method {
} pa_resample_method_t;
pa_resampler* pa_resampler_new(
- const pa_sample_spec *a,
- const pa_channel_map *am,
- const pa_sample_spec *b,
- const pa_channel_map *bm,
- pa_memblock_stat *s,
- pa_resample_method_t resample_method);
+ pa_mempool *pool,
+ const pa_sample_spec *a,
+ const pa_channel_map *am,
+ const pa_sample_spec *b,
+ const pa_channel_map *bm,
+ pa_resample_method_t resample_method);
void pa_resampler_free(pa_resampler *r);
diff --git a/src/pulsecore/sample-util.c b/src/pulsecore/sample-util.c
index 638f8067..7f5d8a02 100644
--- a/src/pulsecore/sample-util.c
+++ b/src/pulsecore/sample-util.c
@@ -35,13 +35,14 @@
#include "sample-util.h"
#include "endianmacros.h"
-pa_memblock *pa_silence_memblock_new(const pa_sample_spec *spec, size_t length, pa_memblock_stat*s) {
+pa_memblock *pa_silence_memblock_new(pa_mempool *pool, const pa_sample_spec *spec, size_t length) {
+ assert(pool);
assert(spec);
if (length == 0)
- length = pa_bytes_per_second(spec)/10; /* 100 ms */
+ length = pa_bytes_per_second(spec)/20; /* 50 ms */
- return pa_silence_memblock(pa_memblock_new(length, s), spec);
+ return pa_silence_memblock(pa_memblock_new(pool, length), spec);
}
pa_memblock *pa_silence_memblock(pa_memblock* b, const pa_sample_spec *spec) {
diff --git a/src/pulsecore/sample-util.h b/src/pulsecore/sample-util.h
index 3ebb7e2e..6b770792 100644
--- a/src/pulsecore/sample-util.h
+++ b/src/pulsecore/sample-util.h
@@ -28,7 +28,7 @@
#include <pulsecore/memchunk.h>
pa_memblock *pa_silence_memblock(pa_memblock* b, const pa_sample_spec *spec);
-pa_memblock *pa_silence_memblock_new(const pa_sample_spec *spec, size_t length, pa_memblock_stat*s);
+pa_memblock *pa_silence_memblock_new(pa_mempool *pool, const pa_sample_spec *spec, size_t length);
void pa_silence_memchunk(pa_memchunk *c, const pa_sample_spec *spec);
void pa_silence_memory(void *p, size_t length, const pa_sample_spec *spec);
diff --git a/src/pulsecore/sink-input.c b/src/pulsecore/sink-input.c
index b3fabad3..b5ba9df1 100644
--- a/src/pulsecore/sink-input.c
+++ b/src/pulsecore/sink-input.c
@@ -136,9 +136,9 @@ pa_sink_input* pa_sink_input_new(
!pa_channel_map_equal(&data->channel_map, &data->sink->channel_map))
if (!(resampler = pa_resampler_new(
+ core->mempool,
&data->sample_spec, &data->channel_map,
&data->sink->sample_spec, &data->sink->channel_map,
- core->memblock_stat,
data->resample_method))) {
pa_log_warn(__FILE__": Unsupported resampling operation.");
return NULL;
@@ -299,7 +299,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)
* while until the old sink has drained its playback buffer */
if (!i->silence_memblock)
- i->silence_memblock = pa_silence_memblock_new(&i->sink->sample_spec, SILENCE_BUFFER_LENGTH, i->sink->core->memblock_stat);
+ i->silence_memblock = pa_silence_memblock_new(i->sink->core->mempool, &i->sink->sample_spec, SILENCE_BUFFER_LENGTH);
chunk->memblock = pa_memblock_ref(i->silence_memblock);
chunk->index = 0;
@@ -338,7 +338,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)
/* It might be necessary to adjust the volume here */
if (do_volume_adj_here && !volume_is_norm) {
- pa_memchunk_make_writable(&tchunk, i->sink->core->memblock_stat, 0);
+ pa_memchunk_make_writable(&tchunk, 0);
pa_volume_memchunk(&tchunk, &i->sample_spec, &i->volume);
}
@@ -540,9 +540,9 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
/* Okey, we need a new resampler for the new sink */
if (!(new_resampler = pa_resampler_new(
+ dest->core->mempool,
&i->sample_spec, &i->channel_map,
&dest->sample_spec, &dest->channel_map,
- dest->core->memblock_stat,
i->resample_method))) {
pa_log_warn(__FILE__": Unsupported resampling operation.");
return -1;
@@ -553,7 +553,7 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
pa_usec_t old_latency, new_latency;
pa_usec_t silence_usec = 0;
- buffer = pa_memblockq_new(0, MOVE_BUFFER_LENGTH, 0, pa_frame_size(&origin->sample_spec), 0, 0, NULL, NULL);
+ buffer = pa_memblockq_new(0, MOVE_BUFFER_LENGTH, 0, pa_frame_size(&origin->sample_spec), 0, 0, NULL);
/* Let's do a little bit of Voodoo for compensating latency
* differences */
@@ -599,7 +599,7 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
chunk.length = n;
if (!volume_is_norm) {
- pa_memchunk_make_writable(&chunk, origin->core->memblock_stat, 0);
+ pa_memchunk_make_writable(&chunk, 0);
pa_volume_memchunk(&chunk, &origin->sample_spec, &volume);
}
diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c
index 557d5efc..aacb89fd 100644
--- a/src/pulsecore/sink.c
+++ b/src/pulsecore/sink.c
@@ -298,14 +298,14 @@ int pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result) {
pa_sw_cvolume_multiply(&volume, &s->sw_volume, &info[0].volume);
if (s->sw_muted || !pa_cvolume_is_norm(&volume)) {
- pa_memchunk_make_writable(result, s->core->memblock_stat, 0);
+ pa_memchunk_make_writable(result, 0);
if (s->sw_muted)
pa_silence_memchunk(result, &s->sample_spec);
else
pa_volume_memchunk(result, &s->sample_spec, &volume);
}
} else {
- result->memblock = pa_memblock_new(length, s->core->memblock_stat);
+ result->memblock = pa_memblock_new(s->core->mempool, length);
assert(result->memblock);
/* pa_log("mixing %i", n); */
@@ -429,7 +429,7 @@ void pa_sink_render_full(pa_sink *s, size_t length, pa_memchunk *result) {
/*** This needs optimization ***/
- result->memblock = pa_memblock_new(result->length = length, s->core->memblock_stat);
+ result->memblock = pa_memblock_new(s->core->mempool, result->length = length);
result->index = 0;
pa_sink_render_into_full(s, result);
diff --git a/src/pulsecore/sound-file-stream.c b/src/pulsecore/sound-file-stream.c
index 6063b93e..6782f50e 100644
--- a/src/pulsecore/sound-file-stream.c
+++ b/src/pulsecore/sound-file-stream.c
@@ -75,7 +75,7 @@ static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) {
uint32_t fs = pa_frame_size(&i->sample_spec);
sf_count_t n;
- u->memchunk.memblock = pa_memblock_new(BUF_SIZE, i->sink->core->memblock_stat);
+ u->memchunk.memblock = pa_memblock_new(i->sink->core->mempool, BUF_SIZE);
u->memchunk.index = 0;
if (u->readf_function) {
diff --git a/src/pulsecore/sound-file.c b/src/pulsecore/sound-file.c
index d11d4b9d..256cce43 100644
--- a/src/pulsecore/sound-file.c
+++ b/src/pulsecore/sound-file.c
@@ -34,7 +34,7 @@
#include "sound-file.h"
#include "core-scache.h"
-int pa_sound_file_load(const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk, pa_memblock_stat *s) {
+int pa_sound_file_load(pa_mempool *pool, const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk) {
SNDFILE*sf = NULL;
SF_INFO sfinfo;
int ret = -1;
@@ -92,7 +92,7 @@ int pa_sound_file_load(const char *fname, pa_sample_spec *ss, pa_channel_map *ma
goto finish;
}
- chunk->memblock = pa_memblock_new(l, s);
+ chunk->memblock = pa_memblock_new(pool, l);
assert(chunk->memblock);
chunk->index = 0;
chunk->length = l;
diff --git a/src/pulsecore/sound-file.h b/src/pulsecore/sound-file.h
index 0b81d97e..7e3c82ea 100644
--- a/src/pulsecore/sound-file.h
+++ b/src/pulsecore/sound-file.h
@@ -26,7 +26,7 @@
#include <pulse/channelmap.h>
#include <pulsecore/memchunk.h>
-int pa_sound_file_load(const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk, pa_memblock_stat *s);
+int pa_sound_file_load(pa_mempool *pool, const char *fname, pa_sample_spec *ss, pa_channel_map *map, pa_memchunk *chunk);
int pa_sound_file_too_big_to_cache(const char *fname);
diff --git a/src/pulsecore/source-output.c b/src/pulsecore/source-output.c
index 7371474f..f9d66f6d 100644
--- a/src/pulsecore/source-output.c
+++ b/src/pulsecore/source-output.c
@@ -115,9 +115,9 @@ pa_source_output* pa_source_output_new(
if (!pa_sample_spec_equal(&data->sample_spec, &data->source->sample_spec) ||
!pa_channel_map_equal(&data->channel_map, &data->source->channel_map))
if (!(resampler = pa_resampler_new(
+ core->mempool,
&data->source->sample_spec, &data->source->channel_map,
&data->sample_spec, &data->channel_map,
- core->memblock_stat,
data->resample_method))) {
pa_log_warn(__FILE__": Unsupported resampling operation.");
return NULL;
@@ -330,9 +330,9 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest) {
/* Okey, we need a new resampler for the new sink */
if (!(new_resampler = pa_resampler_new(
+ dest->core->mempool,
&dest->sample_spec, &dest->channel_map,
&o->sample_spec, &o->channel_map,
- dest->core->memblock_stat,
o->resample_method))) {
pa_log_warn(__FILE__": Unsupported resampling operation.");
return -1;
diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c
index 0d55da44..cb5b1030 100644
--- a/src/pulsecore/source.c
+++ b/src/pulsecore/source.c
@@ -211,7 +211,7 @@ void pa_source_post(pa_source*s, const pa_memchunk *chunk) {
pa_memchunk vchunk = *chunk;
pa_memblock_ref(vchunk.memblock);
- pa_memchunk_make_writable(&vchunk, s->core->memblock_stat, 0);
+ pa_memchunk_make_writable(&vchunk, 0);
if (s->sw_muted)
pa_silence_memchunk(&vchunk, &s->sample_spec);
else