From f44ba092651aa75055e109e04b4164ea92ae7fdc Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Mon, 19 Jun 2006 21:53:48 +0000 Subject: big s/polyp/pulse/g git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@1033 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/protocol-simple.c | 494 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 494 insertions(+) create mode 100644 src/pulsecore/protocol-simple.c (limited to 'src/pulsecore/protocol-simple.c') diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c new file mode 100644 index 00000000..4d73bd24 --- /dev/null +++ b/src/pulsecore/protocol-simple.c @@ -0,0 +1,494 @@ +/* $Id$ */ + +/*** + This file is part of PulseAudio. + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; either version 2 of the License, + or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with PulseAudio; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + USA. +***/ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "protocol-simple.h" + +/* Don't allow more than this many concurrent connections */ +#define MAX_CONNECTIONS 10 + +struct connection { + pa_protocol_simple *protocol; + pa_iochannel *io; + pa_sink_input *sink_input; + pa_source_output *source_output; + pa_client *client; + pa_memblockq *input_memblockq, *output_memblockq; + pa_defer_event *defer_event; + + int dead; + + struct { + pa_memblock *current_memblock; + size_t memblock_index, fragment_size; + } playback; +}; + +struct pa_protocol_simple { + pa_module *module; + pa_core *core; + pa_socket_server*server; + pa_idxset *connections; + enum { + RECORD = 1, + PLAYBACK = 2, + DUPLEX = 3 + } mode; + pa_sample_spec sample_spec; + char *source_name, *sink_name; +}; + +#define PLAYBACK_BUFFER_SECONDS (.5) +#define PLAYBACK_BUFFER_FRAGMENTS (10) +#define RECORD_BUFFER_SECONDS (5) +#define RECORD_BUFFER_FRAGMENTS (100) + +static void connection_free(struct connection *c) { + assert(c); + + pa_idxset_remove_by_data(c->protocol->connections, c, NULL); + + if (c->playback.current_memblock) + pa_memblock_unref(c->playback.current_memblock); + if (c->sink_input) { + pa_sink_input_disconnect(c->sink_input); + pa_sink_input_unref(c->sink_input); + } + if (c->source_output) { + pa_source_output_disconnect(c->source_output); + pa_source_output_unref(c->source_output); + } + if (c->client) + pa_client_free(c->client); + if (c->io) + pa_iochannel_free(c->io); + if (c->input_memblockq) + pa_memblockq_free(c->input_memblockq); + if (c->output_memblockq) + pa_memblockq_free(c->output_memblockq); + if (c->defer_event) + c->protocol->core->mainloop->defer_free(c->defer_event); + pa_xfree(c); +} + +static int do_read(struct connection *c) { + pa_memchunk chunk; + ssize_t r; + size_t l; + + if (!c->sink_input || !(l = pa_memblockq_missing(c->input_memblockq))) + return 0; + + if (l > c->playback.fragment_size) + l = c->playback.fragment_size; + + if (c->playback.current_memblock) + if (c->playback.current_memblock->length - c->playback.memblock_index < l) { + pa_memblock_unref(c->playback.current_memblock); + c->playback.current_memblock = NULL; + c->playback.memblock_index = 0; + } + + if (!c->playback.current_memblock) { + c->playback.current_memblock = pa_memblock_new(c->playback.fragment_size*2, c->protocol->core->memblock_stat); + assert(c->playback.current_memblock && c->playback.current_memblock->length >= l); + c->playback.memblock_index = 0; + } + + if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) { + pa_log_debug(__FILE__": read(): %s", r == 0 ? "EOF" : pa_cstrerror(errno)); + return -1; + } + + chunk.memblock = c->playback.current_memblock; + chunk.index = c->playback.memblock_index; + chunk.length = r; + assert(chunk.memblock); + + c->playback.memblock_index += r; + + assert(c->input_memblockq); + pa_memblockq_push_align(c->input_memblockq, &chunk); + assert(c->sink_input); + pa_sink_notify(c->sink_input->sink); + + return 0; +} + +static int do_write(struct connection *c) { + pa_memchunk chunk; + ssize_t r; + + if (!c->source_output) + return 0; + + assert(c->output_memblockq); + if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0) + return 0; + + assert(chunk.memblock && chunk.length); + + if ((r = pa_iochannel_write(c->io, (uint8_t*) chunk.memblock->data+chunk.index, chunk.length)) < 0) { + pa_memblock_unref(chunk.memblock); + pa_log(__FILE__": write(): %s", pa_cstrerror(errno)); + return -1; + } + + pa_memblockq_drop(c->output_memblockq, &chunk, r); + pa_memblock_unref(chunk.memblock); + + pa_source_notify(c->source_output->source); + + return 0; +} + +static void do_work(struct connection *c) { + assert(c); + + assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); + c->protocol->core->mainloop->defer_enable(c->defer_event, 0); + + if (c->dead) + return; + + if (pa_iochannel_is_readable(c->io)) { + if (do_read(c) < 0) + goto fail; + } else if (pa_iochannel_is_hungup(c->io)) + goto fail; + + if (pa_iochannel_is_writable(c->io)) { + if (do_write(c) < 0) + goto fail; + } + + return; + +fail: + + if (c->sink_input) { + c->dead = 1; + + pa_iochannel_free(c->io); + c->io = NULL; + + pa_memblockq_prebuf_disable(c->input_memblockq); + pa_sink_notify(c->sink_input->sink); + } else + connection_free(c); +} + +/*** sink_input callbacks ***/ + +static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { + struct connection*c; + assert(i && i->userdata && chunk); + c = i->userdata; + + if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) { + + if (c->dead) + connection_free(c); + + return -1; + } + + return 0; +} + +static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) { + struct connection*c = i->userdata; + assert(i && c && length); + + pa_memblockq_drop(c->input_memblockq, chunk, length); + + /* do something */ + assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); + c->protocol->core->mainloop->defer_enable(c->defer_event, 1); +} + +static void sink_input_kill_cb(pa_sink_input *i) { + assert(i && i->userdata); + connection_free((struct connection *) i->userdata); +} + + +static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i) { + struct connection*c = i->userdata; + assert(i && c); + return pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec); +} + +/*** source_output callbacks ***/ + +static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) { + struct connection *c = o->userdata; + assert(o && c && chunk); + + pa_memblockq_push(c->output_memblockq, chunk); + + /* do something */ + assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); + c->protocol->core->mainloop->defer_enable(c->defer_event, 1); +} + +static void source_output_kill_cb(pa_source_output *o) { + assert(o && o->userdata); + connection_free((struct connection *) o->userdata); +} + +static pa_usec_t source_output_get_latency_cb(pa_source_output *o) { + struct connection*c = o->userdata; + assert(o && c); + return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec); +} + +/*** client callbacks ***/ + +static void client_kill_cb(pa_client *c) { + assert(c && c->userdata); + connection_free((struct connection *) c->userdata); +} + +/*** pa_iochannel callbacks ***/ + +static void io_callback(pa_iochannel*io, void *userdata) { + struct connection *c = userdata; + assert(io && c && c->io == io); + + do_work(c); +} + +/*** fixed callback ***/ + +static void defer_callback(pa_mainloop_api*a, pa_defer_event *e, void *userdata) { + struct connection *c = userdata; + assert(a && c && c->defer_event == e); + + do_work(c); +} + +/*** socket_server callbacks ***/ + +static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) { + pa_protocol_simple *p = userdata; + struct connection *c = NULL; + char cname[256]; + assert(s && io && p); + + if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) { + pa_log(__FILE__": Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS); + pa_iochannel_free(io); + return; + } + + c = pa_xmalloc(sizeof(struct connection)); + c->io = io; + c->sink_input = NULL; + c->source_output = NULL; + c->defer_event = NULL; + c->input_memblockq = c->output_memblockq = NULL; + c->protocol = p; + c->playback.current_memblock = NULL; + c->playback.memblock_index = 0; + c->playback.fragment_size = 0; + c->dead = 0; + + pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname)); + c->client = pa_client_new(p->core, __FILE__, cname); + assert(c->client); + c->client->owner = p->module; + c->client->kill = client_kill_cb; + c->client->userdata = c; + + if (p->mode & PLAYBACK) { + pa_sink *sink; + size_t l; + + if (!(sink = pa_namereg_get(p->core, p->sink_name, PA_NAMEREG_SINK, 1))) { + pa_log(__FILE__": Failed to get sink."); + goto fail; + } + + if (!(c->sink_input = pa_sink_input_new(sink, __FILE__, c->client->name, &p->sample_spec, NULL, NULL, 0, -1))) { + pa_log(__FILE__": Failed to create sink input."); + goto fail; + } + + c->sink_input->owner = p->module; + c->sink_input->client = c->client; + + c->sink_input->peek = sink_input_peek_cb; + c->sink_input->drop = sink_input_drop_cb; + c->sink_input->kill = sink_input_kill_cb; + c->sink_input->get_latency = sink_input_get_latency_cb; + c->sink_input->userdata = c; + + l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS); + c->input_memblockq = pa_memblockq_new( + 0, + l, + 0, + pa_frame_size(&p->sample_spec), + (size_t) -1, + l/PLAYBACK_BUFFER_FRAGMENTS, + NULL, + p->core->memblock_stat); + assert(c->input_memblockq); + pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5); + c->playback.fragment_size = l/10; + } + + if (p->mode & RECORD) { + pa_source *source; + size_t l; + + if (!(source = pa_namereg_get(p->core, p->source_name, PA_NAMEREG_SOURCE, 1))) { + pa_log(__FILE__": Failed to get source."); + goto fail; + } + + c->source_output = pa_source_output_new(source, __FILE__, c->client->name, &p->sample_spec, NULL, -1); + if (!c->source_output) { + pa_log(__FILE__": Failed to create source output."); + goto fail; + } + c->source_output->owner = p->module; + c->source_output->client = c->client; + + c->source_output->push = source_output_push_cb; + c->source_output->kill = source_output_kill_cb; + c->source_output->get_latency = source_output_get_latency_cb; + c->source_output->userdata = c; + + l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS); + c->output_memblockq = pa_memblockq_new( + 0, + l, + 0, + pa_frame_size(&p->sample_spec), + 1, + 0, + NULL, + p->core->memblock_stat); + pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2); + } + + pa_iochannel_set_callback(c->io, io_callback, c); + pa_idxset_put(p->connections, c, NULL); + + c->defer_event = p->core->mainloop->defer_new(p->core->mainloop, defer_callback, c); + assert(c->defer_event); + p->core->mainloop->defer_enable(c->defer_event, 0); + + return; + +fail: + if (c) + connection_free(c); +} + +pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *server, pa_module *m, pa_modargs *ma) { + pa_protocol_simple* p = NULL; + int enable; + assert(core && server && ma); + + p = pa_xmalloc0(sizeof(pa_protocol_simple)); + p->module = m; + p->core = core; + p->server = server; + p->connections = pa_idxset_new(NULL, NULL); + + p->sample_spec = core->default_sample_spec; + if (pa_modargs_get_sample_spec(ma, &p->sample_spec) < 0) { + pa_log(__FILE__": Failed to parse sample type specification."); + goto fail; + } + + p->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL)); + p->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL)); + + enable = 0; + if (pa_modargs_get_value_boolean(ma, "record", &enable) < 0) { + pa_log(__FILE__": record= expects a numeric argument."); + goto fail; + } + p->mode = enable ? RECORD : 0; + + enable = 1; + if (pa_modargs_get_value_boolean(ma, "playback", &enable) < 0) { + pa_log(__FILE__": playback= expects a numeric argument."); + goto fail; + } + p->mode |= enable ? PLAYBACK : 0; + + if ((p->mode & (RECORD|PLAYBACK)) == 0) { + pa_log(__FILE__": neither playback nor recording enabled for protocol."); + goto fail; + } + + pa_socket_server_set_callback(p->server, on_connection, p); + + return p; + +fail: + if (p) + pa_protocol_simple_free(p); + return NULL; +} + + +void pa_protocol_simple_free(pa_protocol_simple *p) { + struct connection *c; + assert(p); + + if (p->connections) { + while((c = pa_idxset_first(p->connections, NULL))) + connection_free(c); + + pa_idxset_free(p->connections, NULL, NULL); + } + + if (p->server) + pa_socket_server_unref(p->server); + pa_xfree(p); +} + -- cgit From a621d9028548723d13df64df06a4f4538504e7a3 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Sun, 13 Aug 2006 16:19:56 +0000 Subject: allow hooking into the process of creating playback streams. To implement this I modified the pa_sink_input_new() signature to take a pa_sink_input_new_data structure instead of direct arguments. git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@1237 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/protocol-simple.c | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) (limited to 'src/pulsecore/protocol-simple.c') diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index 4d73bd24..5071191a 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -340,22 +340,21 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) c->client->userdata = c; if (p->mode & PLAYBACK) { - pa_sink *sink; + pa_sink_input_new_data data; size_t l; - if (!(sink = pa_namereg_get(p->core, p->sink_name, PA_NAMEREG_SINK, 1))) { - pa_log(__FILE__": Failed to get sink."); - goto fail; - } + pa_sink_input_new_data_init(&data); + data.driver = __FILE__; + data.name = c->client->name; + pa_sink_input_new_data_set_sample_spec(&data, &p->sample_spec); + data.module = p->module; + data.client = c->client; - if (!(c->sink_input = pa_sink_input_new(sink, __FILE__, c->client->name, &p->sample_spec, NULL, NULL, 0, -1))) { + if (!(c->sink_input = pa_sink_input_new(p->core, &data, 0))) { pa_log(__FILE__": Failed to create sink input."); goto fail; } - c->sink_input->owner = p->module; - c->sink_input->client = c->client; - c->sink_input->peek = sink_input_peek_cb; c->sink_input->drop = sink_input_drop_cb; c->sink_input->kill = sink_input_kill_cb; @@ -375,6 +374,8 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) assert(c->input_memblockq); pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5); c->playback.fragment_size = l/10; + + pa_sink_notify(c->sink_input->sink); } if (p->mode & RECORD) { -- cgit From a75e1ed9ef483c4c08f0fc963c0ea1a980f0c0e9 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Sun, 13 Aug 2006 19:55:17 +0000 Subject: implement hook_source_ouput_new. For this I modified the pa_source_output_new constructor to take a struct similar to what I already did for pa_sink_input_new() git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@1250 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/protocol-simple.c | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) (limited to 'src/pulsecore/protocol-simple.c') diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index 5071191a..3705986d 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -379,22 +379,20 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) } if (p->mode & RECORD) { - pa_source *source; + pa_source_output_new_data data; size_t l; - if (!(source = pa_namereg_get(p->core, p->source_name, PA_NAMEREG_SOURCE, 1))) { - pa_log(__FILE__": Failed to get source."); - goto fail; - } + pa_source_output_new_data_init(&data); + data.driver = __FILE__; + data.name = c->client->name; + pa_source_output_new_data_set_sample_spec(&data, &p->sample_spec); + data.module = p->module; + data.client = c->client; - c->source_output = pa_source_output_new(source, __FILE__, c->client->name, &p->sample_spec, NULL, -1); - if (!c->source_output) { + if (!(c->source_output = pa_source_output_new(p->core, &data, 0))) { pa_log(__FILE__": Failed to create source output."); goto fail; } - c->source_output->owner = p->module; - c->source_output->client = c->client; - c->source_output->push = source_output_push_cb; c->source_output->kill = source_output_kill_cb; c->source_output->get_latency = source_output_get_latency_cb; @@ -411,6 +409,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) NULL, p->core->memblock_stat); pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2); + pa_source_notify(c->source_output->source); } pa_iochannel_set_callback(c->io, io_callback, c); -- cgit From 0e436a6926af56f37a74a03bb5e143e078ca0d55 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Fri, 18 Aug 2006 19:55:18 +0000 Subject: Rework memory management to allow shared memory data transfer. The central idea is to allocate all audio memory blocks from a per-process memory pool which is available as read-only SHM segment to other local processes. Then, instead of writing the actual audio data to the socket just write references to this shared memory pool. To work optimally all memory blocks should now be of type PA_MEMBLOCK_POOL or PA_MEMBLOCK_POOL_EXTERNAL. The function pa_memblock_new() now generates memory blocks of this type by default. git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@1266 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/protocol-simple.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'src/pulsecore/protocol-simple.c') diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index 3705986d..924ee29e 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -128,7 +128,7 @@ static int do_read(struct connection *c) { } if (!c->playback.current_memblock) { - c->playback.current_memblock = pa_memblock_new(c->playback.fragment_size*2, c->protocol->core->memblock_stat); + c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2); assert(c->playback.current_memblock && c->playback.current_memblock->length >= l); c->playback.memblock_index = 0; } @@ -369,8 +369,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) pa_frame_size(&p->sample_spec), (size_t) -1, l/PLAYBACK_BUFFER_FRAGMENTS, - NULL, - p->core->memblock_stat); + NULL); assert(c->input_memblockq); pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5); c->playback.fragment_size = l/10; @@ -406,8 +405,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) pa_frame_size(&p->sample_spec), 1, 0, - NULL, - p->core->memblock_stat); + NULL); pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2); pa_source_notify(c->source_output->source); } -- cgit From e385d93e5aad6a6fce754c00c804ff1d6a6746d4 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Fri, 18 Aug 2006 21:38:40 +0000 Subject: remove all occurences of pa_logXXX(__FILE__": and replace them by pa_logXXX(" git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@1272 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/protocol-simple.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'src/pulsecore/protocol-simple.c') diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index 924ee29e..6bfba875 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -134,7 +134,7 @@ static int do_read(struct connection *c) { } if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) { - pa_log_debug(__FILE__": read(): %s", r == 0 ? "EOF" : pa_cstrerror(errno)); + pa_log_debug("read(): %s", r == 0 ? "EOF" : pa_cstrerror(errno)); return -1; } @@ -168,7 +168,7 @@ static int do_write(struct connection *c) { if ((r = pa_iochannel_write(c->io, (uint8_t*) chunk.memblock->data+chunk.index, chunk.length)) < 0) { pa_memblock_unref(chunk.memblock); - pa_log(__FILE__": write(): %s", pa_cstrerror(errno)); + pa_log("write(): %s", pa_cstrerror(errno)); return -1; } @@ -315,7 +315,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) assert(s && io && p); if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) { - pa_log(__FILE__": Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS); + pa_log("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS); pa_iochannel_free(io); return; } @@ -351,7 +351,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) data.client = c->client; if (!(c->sink_input = pa_sink_input_new(p->core, &data, 0))) { - pa_log(__FILE__": Failed to create sink input."); + pa_log("Failed to create sink input."); goto fail; } @@ -389,7 +389,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) data.client = c->client; if (!(c->source_output = pa_source_output_new(p->core, &data, 0))) { - pa_log(__FILE__": Failed to create source output."); + pa_log("Failed to create source output."); goto fail; } c->source_output->push = source_output_push_cb; @@ -437,7 +437,7 @@ pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *serv p->sample_spec = core->default_sample_spec; if (pa_modargs_get_sample_spec(ma, &p->sample_spec) < 0) { - pa_log(__FILE__": Failed to parse sample type specification."); + pa_log("Failed to parse sample type specification."); goto fail; } @@ -446,20 +446,20 @@ pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *serv enable = 0; if (pa_modargs_get_value_boolean(ma, "record", &enable) < 0) { - pa_log(__FILE__": record= expects a numeric argument."); + pa_log("record= expects a numeric argument."); goto fail; } p->mode = enable ? RECORD : 0; enable = 1; if (pa_modargs_get_value_boolean(ma, "playback", &enable) < 0) { - pa_log(__FILE__": playback= expects a numeric argument."); + pa_log("playback= expects a numeric argument."); goto fail; } p->mode |= enable ? PLAYBACK : 0; if ((p->mode & (RECORD|PLAYBACK)) == 0) { - pa_log(__FILE__": neither playback nor recording enabled for protocol."); + pa_log("neither playback nor recording enabled for protocol."); goto fail; } -- cgit From d210ebbb09daddb2c8c8e8e77243e088b0b19c4d Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Tue, 26 Sep 2006 23:50:56 +0000 Subject: rework memory block management to be thread-safe and mostly lock-free. pa_memblock is now an opaque structure. Access to its fields is now done through various accessor functions in a thread-safe manner. pa_memblock_acquire() and pa_memblock_release() are now used to access the attached audio data. Why? To allow safe manipulation of the memory pointer maintained by the memory block. Internally _acquire() and _release() maintain a reference counter. Please do not confuse this reference counter whith the one maintained by pa_memblock_ref()/_unref()! As a side effect this patch removes all direct usages of AO_t and replaces it with pa_atomic_xxx based code. This stuff needs some serious testing love. Especially if threads are actively used. git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@1404 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/protocol-simple.c | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) (limited to 'src/pulsecore/protocol-simple.c') diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index 6bfba875..bf203e42 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -113,6 +113,7 @@ static int do_read(struct connection *c) { pa_memchunk chunk; ssize_t r; size_t l; + void *p; if (!c->sink_input || !(l = pa_memblockq_missing(c->input_memblockq))) return 0; @@ -121,7 +122,7 @@ static int do_read(struct connection *c) { l = c->playback.fragment_size; if (c->playback.current_memblock) - if (c->playback.current_memblock->length - c->playback.memblock_index < l) { + if (pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index < l) { pa_memblock_unref(c->playback.current_memblock); c->playback.current_memblock = NULL; c->playback.memblock_index = 0; @@ -129,15 +130,20 @@ static int do_read(struct connection *c) { if (!c->playback.current_memblock) { c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2); - assert(c->playback.current_memblock && c->playback.current_memblock->length >= l); + assert(c->playback.current_memblock); + assert(pa_memblock_get_length(c->playback.current_memblock) >= l); c->playback.memblock_index = 0; } + + p = pa_memblock_acquire(c->playback.current_memblock); - if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) { + if ((r = pa_iochannel_read(c->io, (uint8_t*) p + c->playback.memblock_index, l)) <= 0) { + pa_memblock_release(c->playback.current_memblock); pa_log_debug("read(): %s", r == 0 ? "EOF" : pa_cstrerror(errno)); return -1; } + pa_memblock_release(c->playback.current_memblock); chunk.memblock = c->playback.current_memblock; chunk.index = c->playback.memblock_index; chunk.length = r; @@ -156,7 +162,8 @@ static int do_read(struct connection *c) { static int do_write(struct connection *c) { pa_memchunk chunk; ssize_t r; - + void *p; + if (!c->source_output) return 0; @@ -165,12 +172,17 @@ static int do_write(struct connection *c) { return 0; assert(chunk.memblock && chunk.length); + + p = pa_memblock_acquire(chunk.memblock); - if ((r = pa_iochannel_write(c->io, (uint8_t*) chunk.memblock->data+chunk.index, chunk.length)) < 0) { + if ((r = pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chunk.length)) < 0) { + pa_memblock_release(chunk.memblock); pa_memblock_unref(chunk.memblock); pa_log("write(): %s", pa_cstrerror(errno)); return -1; } + + pa_memblock_release(chunk.memblock); pa_memblockq_drop(c->output_memblockq, &chunk, r); pa_memblock_unref(chunk.memblock); -- cgit From 8dc62142765249addf131b058c27f931ede1776b Mon Sep 17 00:00:00 2001 From: Pierre Ossman Date: Mon, 6 Nov 2006 13:06:01 +0000 Subject: Revert r1404 and keep it on a development branch until it is fully tested. git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@1409 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/protocol-simple.c | 22 +++++----------------- 1 file changed, 5 insertions(+), 17 deletions(-) (limited to 'src/pulsecore/protocol-simple.c') diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index bf203e42..6bfba875 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -113,7 +113,6 @@ static int do_read(struct connection *c) { pa_memchunk chunk; ssize_t r; size_t l; - void *p; if (!c->sink_input || !(l = pa_memblockq_missing(c->input_memblockq))) return 0; @@ -122,7 +121,7 @@ static int do_read(struct connection *c) { l = c->playback.fragment_size; if (c->playback.current_memblock) - if (pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index < l) { + if (c->playback.current_memblock->length - c->playback.memblock_index < l) { pa_memblock_unref(c->playback.current_memblock); c->playback.current_memblock = NULL; c->playback.memblock_index = 0; @@ -130,20 +129,15 @@ static int do_read(struct connection *c) { if (!c->playback.current_memblock) { c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2); - assert(c->playback.current_memblock); - assert(pa_memblock_get_length(c->playback.current_memblock) >= l); + assert(c->playback.current_memblock && c->playback.current_memblock->length >= l); c->playback.memblock_index = 0; } - - p = pa_memblock_acquire(c->playback.current_memblock); - if ((r = pa_iochannel_read(c->io, (uint8_t*) p + c->playback.memblock_index, l)) <= 0) { - pa_memblock_release(c->playback.current_memblock); + if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) { pa_log_debug("read(): %s", r == 0 ? "EOF" : pa_cstrerror(errno)); return -1; } - pa_memblock_release(c->playback.current_memblock); chunk.memblock = c->playback.current_memblock; chunk.index = c->playback.memblock_index; chunk.length = r; @@ -162,8 +156,7 @@ static int do_read(struct connection *c) { static int do_write(struct connection *c) { pa_memchunk chunk; ssize_t r; - void *p; - + if (!c->source_output) return 0; @@ -172,17 +165,12 @@ static int do_write(struct connection *c) { return 0; assert(chunk.memblock && chunk.length); - - p = pa_memblock_acquire(chunk.memblock); - if ((r = pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chunk.length)) < 0) { - pa_memblock_release(chunk.memblock); + if ((r = pa_iochannel_write(c->io, (uint8_t*) chunk.memblock->data+chunk.index, chunk.length)) < 0) { pa_memblock_unref(chunk.memblock); pa_log("write(): %s", pa_cstrerror(errno)); return -1; } - - pa_memblock_release(chunk.memblock); pa_memblockq_drop(c->output_memblockq, &chunk, r); pa_memblock_unref(chunk.memblock); -- cgit From 521daf6f0ac4fa6a2fbfb5d523c0c743342dca2b Mon Sep 17 00:00:00 2001 From: Pierre Ossman Date: Thu, 4 Jan 2007 13:43:45 +0000 Subject: Huge trailing whitespace cleanup. Let's keep the tree pure from here on, mmmkay? git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@1418 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/protocol-simple.c | 54 ++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 27 deletions(-) (limited to 'src/pulsecore/protocol-simple.c') diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index 6bfba875..0a7a7acb 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -2,17 +2,17 @@ /*** This file is part of PulseAudio. - + PulseAudio is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. - + PulseAudio is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. - + You should have received a copy of the GNU Lesser General Public License along with PulseAudio; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 @@ -55,7 +55,7 @@ struct connection { pa_defer_event *defer_event; int dead; - + struct { pa_memblock *current_memblock; size_t memblock_index, fragment_size; @@ -120,7 +120,7 @@ static int do_read(struct connection *c) { if (l > c->playback.fragment_size) l = c->playback.fragment_size; - if (c->playback.current_memblock) + if (c->playback.current_memblock) if (c->playback.current_memblock->length - c->playback.memblock_index < l) { pa_memblock_unref(c->playback.current_memblock); c->playback.current_memblock = NULL; @@ -132,7 +132,7 @@ static int do_read(struct connection *c) { assert(c->playback.current_memblock && c->playback.current_memblock->length >= l); c->playback.memblock_index = 0; } - + if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) { pa_log_debug("read(): %s", r == 0 ? "EOF" : pa_cstrerror(errno)); return -1; @@ -144,12 +144,12 @@ static int do_read(struct connection *c) { assert(chunk.memblock); c->playback.memblock_index += r; - + assert(c->input_memblockq); pa_memblockq_push_align(c->input_memblockq, &chunk); assert(c->sink_input); pa_sink_notify(c->sink_input->sink); - + return 0; } @@ -158,25 +158,25 @@ static int do_write(struct connection *c) { ssize_t r; if (!c->source_output) - return 0; + return 0; assert(c->output_memblockq); if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0) return 0; - + assert(chunk.memblock && chunk.length); - + if ((r = pa_iochannel_write(c->io, (uint8_t*) chunk.memblock->data+chunk.index, chunk.length)) < 0) { pa_memblock_unref(chunk.memblock); pa_log("write(): %s", pa_cstrerror(errno)); return -1; } - + pa_memblockq_drop(c->output_memblockq, &chunk, r); pa_memblock_unref(chunk.memblock); pa_source_notify(c->source_output->source); - + return 0; } @@ -188,7 +188,7 @@ static void do_work(struct connection *c) { if (c->dead) return; - + if (pa_iochannel_is_readable(c->io)) { if (do_read(c) < 0) goto fail; @@ -198,7 +198,7 @@ static void do_work(struct connection *c) { if (pa_iochannel_is_writable(c->io)) { if (do_write(c) < 0) goto fail; - } + } return; @@ -206,7 +206,7 @@ fail: if (c->sink_input) { c->dead = 1; - + pa_iochannel_free(c->io); c->io = NULL; @@ -222,12 +222,12 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { struct connection*c; assert(i && i->userdata && chunk); c = i->userdata; - + if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) { - + if (c->dead) connection_free(c); - + return -1; } @@ -331,7 +331,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) c->playback.memblock_index = 0; c->playback.fragment_size = 0; c->dead = 0; - + pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname)); c->client = pa_client_new(p->core, __FILE__, cname); assert(c->client); @@ -354,7 +354,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) pa_log("Failed to create sink input."); goto fail; } - + c->sink_input->peek = sink_input_peek_cb; c->sink_input->drop = sink_input_drop_cb; c->sink_input->kill = sink_input_kill_cb; @@ -416,9 +416,9 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) c->defer_event = p->core->mainloop->defer_new(p->core->mainloop, defer_callback, c); assert(c->defer_event); p->core->mainloop->defer_enable(c->defer_event, 0); - + return; - + fail: if (c) connection_free(c); @@ -443,7 +443,7 @@ pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *serv p->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL)); p->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL)); - + enable = 0; if (pa_modargs_get_value_boolean(ma, "record", &enable) < 0) { pa_log("record= expects a numeric argument."); @@ -462,9 +462,9 @@ pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *serv pa_log("neither playback nor recording enabled for protocol."); goto fail; } - + pa_socket_server_set_callback(p->server, on_connection, p); - + return p; fail: @@ -481,7 +481,7 @@ void pa_protocol_simple_free(pa_protocol_simple *p) { if (p->connections) { while((c = pa_idxset_first(p->connections, NULL))) connection_free(c); - + pa_idxset_free(p->connections, NULL, NULL); } -- cgit From 06211b7c8fd329137ae9003818543912a87d9898 Mon Sep 17 00:00:00 2001 From: Pierre Ossman Date: Tue, 13 Feb 2007 15:35:19 +0000 Subject: Add copyright notices to all relevant files. (based on svn log) git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@1426 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/protocol-simple.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/pulsecore/protocol-simple.c') diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index 0a7a7acb..31ad6ddd 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -3,6 +3,8 @@ /*** This file is part of PulseAudio. + Copyright 2004-2006 Lennart Poettering + PulseAudio is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2 of the License, -- cgit From a67c21f093202f142438689d3f7cfbdf4ea82eea Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Sun, 28 Oct 2007 19:13:50 +0000 Subject: merge 'lennart' branch back into trunk. git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@1971 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/protocol-simple.c | 368 ++++++++++++++++++++++++++++------------ 1 file changed, 255 insertions(+), 113 deletions(-) (limited to 'src/pulsecore/protocol-simple.c') diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index 31ad6ddd..64e2a81c 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -25,7 +25,6 @@ #include #endif -#include #include #include #include @@ -41,101 +40,154 @@ #include #include #include +#include +#include #include "protocol-simple.h" /* Don't allow more than this many concurrent connections */ #define MAX_CONNECTIONS 10 -struct connection { +typedef struct connection { + pa_msgobject parent; pa_protocol_simple *protocol; pa_iochannel *io; pa_sink_input *sink_input; pa_source_output *source_output; pa_client *client; pa_memblockq *input_memblockq, *output_memblockq; - pa_defer_event *defer_event; int dead; struct { pa_memblock *current_memblock; size_t memblock_index, fragment_size; + pa_atomic_t missing; } playback; -}; +} connection; + +PA_DECLARE_CLASS(connection); +#define CONNECTION(o) (connection_cast(o)) +static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject); struct pa_protocol_simple { pa_module *module; pa_core *core; pa_socket_server*server; pa_idxset *connections; + enum { RECORD = 1, PLAYBACK = 2, DUPLEX = 3 } mode; + pa_sample_spec sample_spec; char *source_name, *sink_name; }; +enum { + SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */ + SINK_INPUT_MESSAGE_DISABLE_PREBUF /* disabled prebuf, get playback started. */ +}; + +enum { + CONNECTION_MESSAGE_REQUEST_DATA, /* data requested from sink input from the main loop */ + CONNECTION_MESSAGE_POST_DATA, /* data from source output to main loop */ + CONNECTION_MESSAGE_UNLINK_CONNECTION /* Please drop a aconnection now */ +}; + + #define PLAYBACK_BUFFER_SECONDS (.5) #define PLAYBACK_BUFFER_FRAGMENTS (10) #define RECORD_BUFFER_SECONDS (5) #define RECORD_BUFFER_FRAGMENTS (100) -static void connection_free(struct connection *c) { - assert(c); +static void connection_unlink(connection *c) { + pa_assert(c); - pa_idxset_remove_by_data(c->protocol->connections, c, NULL); + if (!c->protocol) + return; - if (c->playback.current_memblock) - pa_memblock_unref(c->playback.current_memblock); if (c->sink_input) { - pa_sink_input_disconnect(c->sink_input); + pa_sink_input_unlink(c->sink_input); pa_sink_input_unref(c->sink_input); + c->sink_input = NULL; } + if (c->source_output) { - pa_source_output_disconnect(c->source_output); + pa_source_output_unlink(c->source_output); pa_source_output_unref(c->source_output); + c->source_output = NULL; } - if (c->client) + + if (c->client) { pa_client_free(c->client); - if (c->io) + c->client = NULL; + } + + if (c->io) { pa_iochannel_free(c->io); + c->io = NULL; + } + + pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c); + c->protocol = NULL; + connection_unref(c); +} + +static void connection_free(pa_object *o) { + connection *c = CONNECTION(o); + pa_assert(c); + + connection_unref(c); + + if (c->playback.current_memblock) + pa_memblock_unref(c->playback.current_memblock); + if (c->input_memblockq) pa_memblockq_free(c->input_memblockq); if (c->output_memblockq) pa_memblockq_free(c->output_memblockq); - if (c->defer_event) - c->protocol->core->mainloop->defer_free(c->defer_event); + pa_xfree(c); } -static int do_read(struct connection *c) { +static int do_read(connection *c) { pa_memchunk chunk; ssize_t r; size_t l; + void *p; + + connection_assert_ref(c); - if (!c->sink_input || !(l = pa_memblockq_missing(c->input_memblockq))) + if (!c->sink_input || (l = pa_atomic_load(&c->playback.missing)) <= 0) return 0; if (l > c->playback.fragment_size) l = c->playback.fragment_size; if (c->playback.current_memblock) - if (c->playback.current_memblock->length - c->playback.memblock_index < l) { + if (pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index < l) { pa_memblock_unref(c->playback.current_memblock); c->playback.current_memblock = NULL; c->playback.memblock_index = 0; } if (!c->playback.current_memblock) { - c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2); - assert(c->playback.current_memblock && c->playback.current_memblock->length >= l); + pa_assert_se(c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, l)); c->playback.memblock_index = 0; } - if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) { + p = pa_memblock_acquire(c->playback.current_memblock); + r = pa_iochannel_read(c->io, (uint8_t*) p + c->playback.memblock_index, l); + pa_memblock_release(c->playback.current_memblock); + + if (r <= 0) { + + if (r < 0 && (errno == EINTR || errno == EAGAIN)) + return 0; + pa_log_debug("read(): %s", r == 0 ? "EOF" : pa_cstrerror(errno)); return -1; } @@ -143,50 +195,55 @@ static int do_read(struct connection *c) { chunk.memblock = c->playback.current_memblock; chunk.index = c->playback.memblock_index; chunk.length = r; - assert(chunk.memblock); c->playback.memblock_index += r; - assert(c->input_memblockq); - pa_memblockq_push_align(c->input_memblockq, &chunk); - assert(c->sink_input); - pa_sink_notify(c->sink_input->sink); + pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, &chunk, NULL); + pa_atomic_sub(&c->playback.missing, r); return 0; } -static int do_write(struct connection *c) { +static int do_write(connection *c) { pa_memchunk chunk; ssize_t r; + void *p; + + connection_assert_ref(c); if (!c->source_output) return 0; - assert(c->output_memblockq); - if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0) + if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0) { +/* pa_log("peek failed"); */ return 0; + } + + pa_assert(chunk.memblock); + pa_assert(chunk.length); - assert(chunk.memblock && chunk.length); + p = pa_memblock_acquire(chunk.memblock); + r = pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chunk.length); + pa_memblock_release(chunk.memblock); + + pa_memblock_unref(chunk.memblock); + + if (r < 0) { + + if (errno == EINTR || errno == EAGAIN) + return 0; - if ((r = pa_iochannel_write(c->io, (uint8_t*) chunk.memblock->data+chunk.index, chunk.length)) < 0) { - pa_memblock_unref(chunk.memblock); pa_log("write(): %s", pa_cstrerror(errno)); return -1; } - pa_memblockq_drop(c->output_memblockq, &chunk, r); - pa_memblock_unref(chunk.memblock); - - pa_source_notify(c->source_output->source); + pa_memblockq_drop(c->output_memblockq, r); return 0; } -static void do_work(struct connection *c) { - assert(c); - - assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); - c->protocol->core->mainloop->defer_enable(c->defer_event, 0); +static void do_work(connection *c) { + connection_assert_ref(c); if (c->dead) return; @@ -207,103 +264,182 @@ static void do_work(struct connection *c) { fail: if (c->sink_input) { + + /* If there is a sink input, we first drain what we already have read before shutting down the connection */ c->dead = 1; pa_iochannel_free(c->io); c->io = NULL; - pa_memblockq_prebuf_disable(c->input_memblockq); - pa_sink_notify(c->sink_input->sink); + pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, 0, NULL, NULL); } else - connection_free(c); + connection_unlink(c); } -/*** sink_input callbacks ***/ +static int connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) { + connection *c = CONNECTION(o); + connection_assert_ref(c); -static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { - struct connection*c; - assert(i && i->userdata && chunk); - c = i->userdata; + switch (code) { + case CONNECTION_MESSAGE_REQUEST_DATA: + do_work(c); + break; - if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) { + case CONNECTION_MESSAGE_POST_DATA: +/* pa_log("got data %u", chunk->length); */ + pa_memblockq_push_align(c->output_memblockq, chunk); + do_work(c); + break; - if (c->dead) - connection_free(c); - - return -1; + case CONNECTION_MESSAGE_UNLINK_CONNECTION: + connection_unlink(c); + break; } return 0; } -static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) { - struct connection*c = i->userdata; - assert(i && c && length); +/*** sink_input callbacks ***/ + +/* Called from thread context */ +static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) { + pa_sink_input *i = PA_SINK_INPUT(o); + connection*c; + + pa_sink_input_assert_ref(i); + c = CONNECTION(i->userdata); + connection_assert_ref(c); + + switch (code) { + + case SINK_INPUT_MESSAGE_POST_DATA: { + pa_assert(chunk); + + /* New data from the main loop */ + pa_memblockq_push_align(c->input_memblockq, chunk); + +/* pa_log("got data, %u", pa_memblockq_get_length(c->input_memblockq)); */ + + return 0; + } + + case SINK_INPUT_MESSAGE_DISABLE_PREBUF: { + pa_memblockq_prebuf_disable(c->input_memblockq); + return 0; + } + + case PA_SINK_INPUT_MESSAGE_GET_LATENCY: { + pa_usec_t *r = userdata; - pa_memblockq_drop(c->input_memblockq, chunk, length); + *r = pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec); - /* do something */ - assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); - c->protocol->core->mainloop->defer_enable(c->defer_event, 1); + /* Fall through, the default handler will add in the extra + * latency added by the resampler */ + } + + default: + return pa_sink_input_process_msg(o, code, userdata, offset, chunk); + } } -static void sink_input_kill_cb(pa_sink_input *i) { - assert(i && i->userdata); - connection_free((struct connection *) i->userdata); +/* Called from thread context */ +static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { + connection *c; + int r; + + pa_assert(i); + c = CONNECTION(i->userdata); + connection_assert_ref(c); + pa_assert(chunk); + + r = pa_memblockq_peek(c->input_memblockq, chunk); + +/* pa_log("peeked %u %i", r >= 0 ? chunk->length: 0, r); */ + + if (c->dead && r < 0) + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_UNLINK_CONNECTION, NULL, 0, NULL, NULL); + + return r; } +/* Called from thread context */ +static void sink_input_drop_cb(pa_sink_input *i, size_t length) { + connection *c; + size_t old, new; + + pa_assert(i); + c = CONNECTION(i->userdata); + connection_assert_ref(c); + pa_assert(length); + + old = pa_memblockq_missing(c->input_memblockq); + pa_memblockq_drop(c->input_memblockq, length); + new = pa_memblockq_missing(c->input_memblockq); -static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i) { - struct connection*c = i->userdata; - assert(i && c); - return pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec); + if (new > old) { + if (pa_atomic_add(&c->playback.missing, new - old) <= 0) + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL); + } +} + +/* Called from main context */ +static void sink_input_kill_cb(pa_sink_input *i) { + pa_sink_input_assert_ref(i); + + connection_unlink(CONNECTION(i->userdata)); } /*** source_output callbacks ***/ +/* Called from thread context */ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) { - struct connection *c = o->userdata; - assert(o && c && chunk); + connection *c; - pa_memblockq_push(c->output_memblockq, chunk); + pa_assert(o); + c = CONNECTION(o->userdata); + pa_assert(c); + pa_assert(chunk); - /* do something */ - assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); - c->protocol->core->mainloop->defer_enable(c->defer_event, 1); + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_POST_DATA, NULL, 0, chunk, NULL); } +/* Called from main context */ static void source_output_kill_cb(pa_source_output *o) { - assert(o && o->userdata); - connection_free((struct connection *) o->userdata); + pa_source_output_assert_ref(o); + + connection_unlink(CONNECTION(o->userdata)); } +/* Called from main context */ static pa_usec_t source_output_get_latency_cb(pa_source_output *o) { - struct connection*c = o->userdata; - assert(o && c); + connection*c; + + pa_assert(o); + c = CONNECTION(o->userdata); + pa_assert(c); + return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec); } /*** client callbacks ***/ -static void client_kill_cb(pa_client *c) { - assert(c && c->userdata); - connection_free((struct connection *) c->userdata); +static void client_kill_cb(pa_client *client) { + connection*c; + + pa_assert(client); + c = CONNECTION(client->userdata); + pa_assert(c); + + connection_unlink(c); } /*** pa_iochannel callbacks ***/ static void io_callback(pa_iochannel*io, void *userdata) { - struct connection *c = userdata; - assert(io && c && c->io == io); - - do_work(c); -} - -/*** fixed callback ***/ + connection *c = CONNECTION(userdata); -static void defer_callback(pa_mainloop_api*a, pa_defer_event *e, void *userdata) { - struct connection *c = userdata; - assert(a && c && c->defer_event == e); + connection_assert_ref(c); + pa_assert(io); do_work(c); } @@ -312,9 +448,12 @@ static void defer_callback(pa_mainloop_api*a, pa_defer_event *e, void *userdata) static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) { pa_protocol_simple *p = userdata; - struct connection *c = NULL; + connection *c = NULL; char cname[256]; - assert(s && io && p); + + pa_assert(s); + pa_assert(io); + pa_assert(p); if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) { pa_log("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS); @@ -322,21 +461,22 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) return; } - c = pa_xmalloc(sizeof(struct connection)); + c = pa_msgobject_new(connection); + c->parent.parent.free = connection_free; + c->parent.process_msg = connection_process_msg; c->io = io; c->sink_input = NULL; c->source_output = NULL; - c->defer_event = NULL; c->input_memblockq = c->output_memblockq = NULL; c->protocol = p; c->playback.current_memblock = NULL; c->playback.memblock_index = 0; c->playback.fragment_size = 0; c->dead = 0; + pa_atomic_store(&c->playback.missing, 0); pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname)); - c->client = pa_client_new(p->core, __FILE__, cname); - assert(c->client); + pa_assert_se(c->client = pa_client_new(p->core, __FILE__, cname)); c->client->owner = p->module; c->client->kill = client_kill_cb; c->client->userdata = c; @@ -357,10 +497,10 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) goto fail; } + c->sink_input->parent.process_msg = sink_input_process_msg; c->sink_input->peek = sink_input_peek_cb; c->sink_input->drop = sink_input_drop_cb; c->sink_input->kill = sink_input_kill_cb; - c->sink_input->get_latency = sink_input_get_latency_cb; c->sink_input->userdata = c; l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS); @@ -372,11 +512,12 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) (size_t) -1, l/PLAYBACK_BUFFER_FRAGMENTS, NULL); - assert(c->input_memblockq); pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5); - c->playback.fragment_size = l/10; + c->playback.fragment_size = l/PLAYBACK_BUFFER_FRAGMENTS; + + pa_atomic_store(&c->playback.missing, pa_memblockq_missing(c->input_memblockq)); - pa_sink_notify(c->sink_input->sink); + pa_sink_input_put(c->sink_input); } if (p->mode & RECORD) { @@ -409,29 +550,29 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) 0, NULL); pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2); - pa_source_notify(c->source_output->source); + + pa_source_output_put(c->source_output); } pa_iochannel_set_callback(c->io, io_callback, c); pa_idxset_put(p->connections, c, NULL); - c->defer_event = p->core->mainloop->defer_new(p->core->mainloop, defer_callback, c); - assert(c->defer_event); - p->core->mainloop->defer_enable(c->defer_event, 0); - return; fail: if (c) - connection_free(c); + connection_unlink(c); } pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *server, pa_module *m, pa_modargs *ma) { pa_protocol_simple* p = NULL; int enable; - assert(core && server && ma); - p = pa_xmalloc0(sizeof(pa_protocol_simple)); + pa_assert(core); + pa_assert(server); + pa_assert(ma); + + p = pa_xnew0(pa_protocol_simple, 1); p->module = m; p->core = core; p->server = server; @@ -472,23 +613,24 @@ pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *serv fail: if (p) pa_protocol_simple_free(p); + return NULL; } void pa_protocol_simple_free(pa_protocol_simple *p) { - struct connection *c; - assert(p); + connection *c; + pa_assert(p); if (p->connections) { while((c = pa_idxset_first(p->connections, NULL))) - connection_free(c); + connection_unlink(c); pa_idxset_free(p->connections, NULL, NULL); } if (p->server) pa_socket_server_unref(p->server); + pa_xfree(p); } - -- cgit From d17bb53d3ebfbd7046719400264bd87830c140d8 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Tue, 13 Nov 2007 17:37:44 +0000 Subject: Completely rework ALSA device selection code: choose the device to open depending on the requested number of channels and channel map. In most cases it will now suffice to set default-channels=6 to enable 5.1 sound for all devices that support it git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@2050 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/protocol-simple.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/pulsecore/protocol-simple.c') diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index 64e2a81c..777def30 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -566,7 +566,7 @@ fail: pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *server, pa_module *m, pa_modargs *ma) { pa_protocol_simple* p = NULL; - int enable; + pa_bool_t enable; pa_assert(core); pa_assert(server); @@ -587,7 +587,7 @@ pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *serv p->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL)); p->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL)); - enable = 0; + enable = FALSE; if (pa_modargs_get_value_boolean(ma, "record", &enable) < 0) { pa_log("record= expects a numeric argument."); goto fail; -- cgit