From e8d1185c4221fef9d712c1f375d8e592721b6943 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Wed, 7 Jul 2004 00:22:46 +0000 Subject: draining ind native protocol fixes in callback code on object destruction simple protocol git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@52 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/Makefile.am | 12 +- src/client.c | 2 +- src/core.c | 3 + src/iochannel.h | 2 + src/main.c | 17 ++- src/memblockq.c | 67 ++++++--- src/memblockq.h | 22 ++- src/module-cli.c | 4 +- src/module-oss-mmap.c | 2 +- src/module-oss.c | 4 +- src/module-pipe-sink.c | 4 +- src/module-protocol-stub.c | 4 +- src/module.c | 6 +- src/module.h | 4 + src/pacat-simple.c | 8 +- src/pacat.c | 76 +++++++--- src/pdispatch.c | 107 +++++++++---- src/pdispatch.h | 10 +- src/polyp.c | 363 +++++++++++++++++++++++++++++++++------------ src/polyp.h | 9 ++ src/polypdef.h | 5 +- src/protocol-esound.c | 18 ++- src/protocol-native-spec.h | 2 + src/protocol-native.c | 159 +++++++++++++------- src/protocol-simple.c | 20 +-- src/pstream.c | 118 +++++++++------ src/pstream.h | 10 +- src/sample.h | 2 + src/simple.c | 54 ++++--- src/sink.c | 12 +- src/sinkinput.c | 17 ++- src/socket-client.h | 2 + src/socket-server.h | 2 + src/source.c | 6 +- src/sourceoutput.c | 13 +- src/todo | 10 +- src/util.c | 15 +- src/util.h | 2 + 38 files changed, 836 insertions(+), 357 deletions(-) diff --git a/src/Makefile.am b/src/Makefile.am index 70e7e09b..d7002caf 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -220,6 +220,7 @@ libpolyp_la_SOURCES = polyp.c polyp.h \ protocol-native-spec.h \ mainloop-api.c mainloop-api.h \ mainloop.c mainloop.h \ + mainloop-signal.c mainloop-signal.h \ idxset.c idxset.h \ util.c util.h \ memblock.c memblock.h \ @@ -237,12 +238,13 @@ libpolyp_error_la_CFLAGS = $(AM_CFLAGS) libpolyp_simple_la_SOURCES = simple.c simple.h libpolyp_simple_la_CFLAGS = $(AM_CFLAGS) -libpolyp_simple_la_LIBADD = libpolyp.la #libpolyp-error.la +libpolyp_simple_la_LIBADD = libpolyp.la +#libpolyp-error.la -pacat_SOURCES = pacat.c #$(libpolyp_la_SOURCES) -pacat_LDADD = libpolyp.la +pacat_SOURCES = pacat.c $(libpolyp_la_SOURCES) $(libpolyp_error_la_SOURCES) +#pacat_LDADD = libpolyp.la pacat_CFLAGS = $(AM_CFLAGS) -pacat_simple_SOURCES = pacat-simple.c -pacat_simple_LDADD = libpolyp-simple.a +pacat_simple_SOURCES = pacat-simple.c $(libpolyp_la_SOURCES) $(libpolyp_simple_la_SOURCES) $(libpolyp_error_la_SOURCES) +#pacat_simple_LDADD = libpolyp-simple.la libpolyp-error.la pacat_simple_CFLAGS = $(AM_CFLAGS) diff --git a/src/client.c b/src/client.c index 7f1648a7..d07f188f 100644 --- a/src/client.c +++ b/src/client.c @@ -59,7 +59,7 @@ char *pa_client_list_to_string(struct pa_core *c) { pa_strbuf_printf(s, "%u client(s).\n", pa_idxset_ncontents(c->clients)); for (client = pa_idxset_first(c->clients, &index); client; client = pa_idxset_next(c->clients, &index)) - pa_strbuf_printf(s, " index: %u, name: <%s>, protocol_name: <%s>\n", client->index, client->name, client->protocol_name); + pa_strbuf_printf(s, " index: %u\n\tname: <%s>\n\tprotocol_name: <%s>\n", client->index, client->name, client->protocol_name); return pa_strbuf_tostring_free(s); } diff --git a/src/core.c b/src/core.c index 46159037..a1fe7d97 100644 --- a/src/core.c +++ b/src/core.c @@ -7,6 +7,7 @@ #include "sink.h" #include "source.h" #include "namereg.h" +#include "util.h" struct pa_core* pa_core_new(struct pa_mainloop_api *m) { struct pa_core* c; @@ -24,6 +25,8 @@ struct pa_core* pa_core_new(struct pa_mainloop_api *m) { c->modules = NULL; c->namereg = NULL; + + pa_check_for_sigpipe(); return c; }; diff --git a/src/iochannel.h b/src/iochannel.h index c550af19..1a5057d6 100644 --- a/src/iochannel.h +++ b/src/iochannel.h @@ -4,6 +4,8 @@ #include #include "mainloop-api.h" +/* It is safe to destroy the calling iochannel object from the callback */ + struct pa_iochannel; struct pa_iochannel* pa_iochannel_new(struct pa_mainloop_api*m, int ifd, int ofd); diff --git a/src/main.c b/src/main.c index 88552fed..c7a83fec 100644 --- a/src/main.c +++ b/src/main.c @@ -12,10 +12,16 @@ static struct pa_mainloop *mainloop; -static void signal_callback(void *id, int sig, void *userdata) { +static void exit_signal_callback(void *id, int sig, void *userdata) { struct pa_mainloop_api* m = pa_mainloop_get_api(mainloop); m->quit(m, 1); - fprintf(stderr, "main: got signal.\n"); + fprintf(stderr, __FILE__": got signal.\n"); +} + +static void aux_signal_callback(void *id, int sig, void *userdata) { + struct pa_core *c = userdata; + assert(c); + pa_module_load(c, sig == SIGUSR1 ? "module-cli" : "module-cli-protocol-unix", NULL); } int main(int argc, char *argv[]) { @@ -30,12 +36,12 @@ int main(int argc, char *argv[]) { r = pa_signal_init(pa_mainloop_get_api(mainloop)); assert(r == 0); - pa_signal_register(SIGINT, signal_callback, NULL); + pa_signal_register(SIGINT, exit_signal_callback, NULL); signal(SIGPIPE, SIG_IGN); c = pa_core_new(pa_mainloop_get_api(mainloop)); assert(c); - + pa_module_load(c, "module-oss", "/dev/dsp"); /* pa_module_load(c, "module-pipe-sink", NULL);*/ pa_module_load(c, "module-simple-protocol-tcp", NULL); @@ -46,6 +52,9 @@ int main(int argc, char *argv[]) { pa_module_load(c, "module-native-protocol-unix", NULL); pa_module_load(c, "module-esound-protocol-tcp", NULL); pa_module_load(c, "module-cli", NULL); + + pa_signal_register(SIGUSR1, aux_signal_callback, c); + pa_signal_register(SIGUSR2, aux_signal_callback, c); fprintf(stderr, "main: mainloop entry.\n"); if (pa_mainloop_run(mainloop, &retval) < 0) diff --git a/src/memblockq.c b/src/memblockq.c index b70a67ff..e5dab687 100644 --- a/src/memblockq.c +++ b/src/memblockq.c @@ -15,30 +15,45 @@ struct memblock_list { struct pa_memblockq { struct memblock_list *blocks, *blocks_tail; unsigned n_blocks; - size_t total_length, maxlength, base, prebuf; + size_t current_length, maxlength, tlength, base, prebuf, minreq; int measure_delay; uint32_t delay; struct pa_mcalign *mcalign; }; -struct pa_memblockq* pa_memblockq_new(size_t maxlength, size_t base, size_t prebuf) { +struct pa_memblockq* pa_memblockq_new(size_t maxlength, size_t tlength, size_t base, size_t prebuf, size_t minreq) { struct pa_memblockq* bq; - assert(maxlength && base); + assert(maxlength && base && maxlength); bq = malloc(sizeof(struct pa_memblockq)); assert(bq); bq->blocks = bq->blocks_tail = 0; bq->n_blocks = 0; - bq->total_length = 0; + + bq->current_length = 0; + + fprintf(stderr, "memblockq requested: maxlength=%u, tlength=%u, base=%u, prebuf=%u, minreq=%u\n", maxlength, tlength, base, prebuf, minreq); + bq->base = base; + bq->maxlength = ((maxlength+base-1)/base)*base; - bq->prebuf = prebuf == (size_t) -1 ? bq->maxlength/2 : prebuf; + assert(bq->maxlength >= base); + + bq->tlength = ((tlength+base-1)/base)*base; + if (bq->tlength == 0 || bq->tlength >= bq->maxlength) + bq->tlength = bq->maxlength; + bq->prebuf = (prebuf == (size_t) -1) ? bq->maxlength/2 : prebuf; + bq->prebuf = (bq->prebuf/base)*base; if (bq->prebuf > bq->maxlength) bq->prebuf = bq->maxlength; - assert(bq->maxlength >= base); + bq->minreq = (minreq/base)*base; + if (bq->minreq == 0) + bq->minreq = 1; + fprintf(stderr, "memblockq sanitized: maxlength=%u, tlength=%u, base=%u, prebuf=%u, minreq=%u\n", bq->maxlength, bq->tlength, bq->base, bq->prebuf, bq->minreq); + bq->measure_delay = 0; bq->delay = 0; @@ -88,7 +103,7 @@ void pa_memblockq_push(struct pa_memblockq* bq, const struct pa_memchunk *chunk, bq->blocks_tail = q; bq->n_blocks++; - bq->total_length += chunk->length; + bq->current_length += chunk->length; pa_memblockq_shorten(bq, bq->maxlength); } @@ -96,7 +111,7 @@ void pa_memblockq_push(struct pa_memblockq* bq, const struct pa_memchunk *chunk, int pa_memblockq_peek(struct pa_memblockq* bq, struct pa_memchunk *chunk) { assert(bq && chunk); - if (!bq->blocks || bq->total_length < bq->prebuf) + if (!bq->blocks || bq->current_length < bq->prebuf) return -1; bq->prebuf = 0; @@ -116,7 +131,7 @@ int memblockq_pop(struct memblockq* bq, struct pa_memchunk *chunk) { assert(bq && chunk); - if (!bq->blocks || bq->total_length < bq->prebuf) + if (!bq->blocks || bq->current_length < bq->prebuf) return -1; bq->prebuf = 0; @@ -127,7 +142,7 @@ int memblockq_pop(struct memblockq* bq, struct pa_memchunk *chunk) { *chunk = q->chunk; bq->n_blocks--; - bq->total_length -= chunk->length; + bq->current_length -= chunk->length; free(q); return 0; @@ -159,7 +174,7 @@ void pa_memblockq_drop(struct pa_memblockq *bq, size_t length) { while (length > 0) { size_t l = length; - assert(bq->blocks && bq->total_length >= length); + assert(bq->blocks && bq->current_length >= length); if (l > bq->blocks->chunk.length) l = bq->blocks->chunk.length; @@ -169,7 +184,7 @@ void pa_memblockq_drop(struct pa_memblockq *bq, size_t length) { bq->blocks->chunk.index += l; bq->blocks->chunk.length -= l; - bq->total_length -= l; + bq->current_length -= l; if (bq->blocks->chunk.length == 0) { struct memblock_list *q; @@ -192,12 +207,12 @@ void pa_memblockq_shorten(struct pa_memblockq *bq, size_t length) { size_t l; assert(bq); - if (bq->total_length <= length) + if (bq->current_length <= length) return; fprintf(stderr, "Warning! pa_memblockq_shorten()\n"); - l = bq->total_length - length; + l = bq->current_length - length; l /= bq->base; l *= bq->base; @@ -213,14 +228,13 @@ void pa_memblockq_empty(struct pa_memblockq *bq) { int pa_memblockq_is_readable(struct pa_memblockq *bq) { assert(bq); - return bq->total_length >= bq->prebuf; + return bq->current_length >= bq->prebuf; } int pa_memblockq_is_writable(struct pa_memblockq *bq, size_t length) { assert(bq); - assert(length <= bq->maxlength); - return bq->total_length + length <= bq->maxlength; + return bq->current_length + length <= bq->tlength; } uint32_t pa_memblockq_get_delay(struct pa_memblockq *bq) { @@ -230,16 +244,20 @@ uint32_t pa_memblockq_get_delay(struct pa_memblockq *bq) { uint32_t pa_memblockq_get_length(struct pa_memblockq *bq) { assert(bq); - return bq->total_length; + return bq->current_length; } -uint32_t pa_memblockq_missing_to(struct pa_memblockq *bq, size_t qlen) { - assert(bq && qlen); +uint32_t pa_memblockq_missing(struct pa_memblockq *bq) { + size_t l; + assert(bq); - if (bq->total_length >= qlen) + if (bq->current_length >= bq->tlength) return 0; - return qlen - bq->total_length; + l = bq->tlength - bq->current_length; + assert(l); + + return (l >= bq->minreq) ? l : 0; } void pa_memblockq_push_align(struct pa_memblockq* bq, const struct pa_memchunk *chunk, size_t delta) { @@ -264,3 +282,8 @@ void pa_memblockq_push_align(struct pa_memblockq* bq, const struct pa_memchunk * delta = 0; } } + +uint32_t pa_memblockq_get_minreq(struct pa_memblockq *bq) { + assert(bq); + return bq->minreq; +} diff --git a/src/memblockq.h b/src/memblockq.h index d8b9567f..bece4fd7 100644 --- a/src/memblockq.h +++ b/src/memblockq.h @@ -8,11 +8,18 @@ struct pa_memblockq; -/* Parameters: the maximum length of the memblock queue, a base value -for all operations (that is, all byte operations shall work on -multiples of this base value) and an amount of bytes to prebuffer -before having pa_memblockq_peek() succeed. */ -struct pa_memblockq* pa_memblockq_new(size_t maxlength, size_t base, size_t prebuf); +/* Parameters: + - maxlength: maximum length of queue. If more data is pushed into the queue, data from the front is dropped + - length: the target length of the queue. + - base: a base value for all metrics. Only multiples of this value are popped from the queue + - prebuf: before passing the first byte out, make sure that enough bytes are in the queue + - minreq: pa_memblockq_missing() will only return values greater than this value +*/ +struct pa_memblockq* pa_memblockq_new(size_t maxlength, + size_t tlength, + size_t base, + size_t prebuf, + size_t minreq); void pa_memblockq_free(struct pa_memblockq*bq); /* Push a new memory chunk into the queue. Optionally specify a value for future cancellation. This is currently not implemented, however! */ @@ -46,6 +53,9 @@ uint32_t pa_memblockq_get_delay(struct pa_memblockq *bq); uint32_t pa_memblockq_get_length(struct pa_memblockq *bq); /* Return how many bytes are missing in queue to the specified fill amount */ -uint32_t pa_memblockq_missing_to(struct pa_memblockq *bq, size_t qlen); +uint32_t pa_memblockq_missing(struct pa_memblockq *bq); + + +uint32_t pa_memblockq_get_minreq(struct pa_memblockq *bq); #endif diff --git a/src/module-cli.c b/src/module-cli.c index 7306ade5..a6e9582d 100644 --- a/src/module-cli.c +++ b/src/module-cli.c @@ -14,7 +14,7 @@ static void eof_cb(struct pa_cli*c, void *userdata) { pa_module_unload_request(m->core, m); } -int module_init(struct pa_core *c, struct pa_module*m) { +int pa_module_init(struct pa_core *c, struct pa_module*m) { struct pa_iochannel *io; assert(c && m); @@ -35,7 +35,7 @@ int module_init(struct pa_core *c, struct pa_module*m) { return 0; } -void module_done(struct pa_core *c, struct pa_module*m) { +void pa_module_done(struct pa_core *c, struct pa_module*m) { assert(c && m); pa_cli_free(m->userdata); diff --git a/src/module-oss-mmap.c b/src/module-oss-mmap.c index ef2b19d0..62c2cc2a 100644 --- a/src/module-oss-mmap.c +++ b/src/module-oss-mmap.c @@ -180,7 +180,7 @@ static uint32_t sink_get_latency_cb(struct pa_sink *s) { return pa_samples_usec(u->out_fill, &s->sample_spec); } -int module_init(struct pa_core *c, struct pa_module*m) { +int pa_module_init(struct pa_core *c, struct pa_module*m) { struct audio_buf_info info; struct userdata *u = NULL; char *p; diff --git a/src/module-oss.c b/src/module-oss.c index 310c89c0..5ec9d2d7 100644 --- a/src/module-oss.c +++ b/src/module-oss.c @@ -111,7 +111,7 @@ static uint32_t sink_get_latency_cb(struct pa_sink *s) { return pa_samples_usec(arg, &s->sample_spec); } -int module_init(struct pa_core *c, struct pa_module*m) { +int pa_module_init(struct pa_core *c, struct pa_module*m) { struct audio_buf_info info; struct userdata *u = NULL; char *p; @@ -224,7 +224,7 @@ fail: return -1; } -void module_done(struct pa_core *c, struct pa_module*m) { +void pa_module_done(struct pa_core *c, struct pa_module*m) { struct userdata *u; assert(c && m); diff --git a/src/module-pipe-sink.c b/src/module-pipe-sink.c index ea5c15db..efba3b5f 100644 --- a/src/module-pipe-sink.c +++ b/src/module-pipe-sink.c @@ -73,7 +73,7 @@ static void io_callback(struct pa_iochannel *io, void*userdata) { do_write(u); } -int module_init(struct pa_core *c, struct pa_module*m) { +int pa_module_init(struct pa_core *c, struct pa_module*m) { struct userdata *u = NULL; struct stat st; char *p; @@ -137,7 +137,7 @@ fail: return -1; } -void module_done(struct pa_core *c, struct pa_module*m) { +void pa_module_done(struct pa_core *c, struct pa_module*m) { struct userdata *u; assert(c && m); diff --git a/src/module-protocol-stub.c b/src/module-protocol-stub.c index 713e0ab8..885ea4c8 100644 --- a/src/module-protocol-stub.c +++ b/src/module-protocol-stub.c @@ -47,7 +47,7 @@ #endif #endif -int module_init(struct pa_core *c, struct pa_module*m) { +int pa_module_init(struct pa_core *c, struct pa_module*m) { struct pa_socket_server *s; assert(c && m); @@ -91,7 +91,7 @@ int module_init(struct pa_core *c, struct pa_module*m) { return 0; } -void module_done(struct pa_core *c, struct pa_module*m) { +void pa_module_done(struct pa_core *c, struct pa_module*m) { assert(c && m); protocol_free(m->userdata); diff --git a/src/module.c b/src/module.c index 468998ba..87df3b38 100644 --- a/src/module.c +++ b/src/module.c @@ -23,10 +23,10 @@ struct pa_module* pa_module_load(struct pa_core *c, const char *name, const char if (!(m->dl = lt_dlopenext(name))) goto fail; - if (!(m->init = lt_dlsym(m->dl, "module_init"))) + if (!(m->init = lt_dlsym(m->dl, "pa_module_init"))) goto fail; - if (!(m->done = lt_dlsym(m->dl, "module_done"))) + if (!(m->done = lt_dlsym(m->dl, "pa_module_done"))) goto fail; m->userdata = NULL; @@ -124,7 +124,7 @@ char *pa_module_list_to_string(struct pa_core *c) { pa_strbuf_printf(s, "%u module(s) loaded.\n", pa_idxset_ncontents(c->modules)); for (m = pa_idxset_first(c->modules, &index); m; m = pa_idxset_next(c->modules, &index)) - pa_strbuf_printf(s, " index: %u, name: <%s>, argument: <%s>\n", m->index, m->name, m->argument); + pa_strbuf_printf(s, " index: %u\n\tname: <%s>\n\targument: <%s>\n", m->index, m->name, m->argument); return pa_strbuf_tostring_free(s); } diff --git a/src/module.h b/src/module.h index 1cc7d775..2a9cf558 100644 --- a/src/module.h +++ b/src/module.h @@ -30,4 +30,8 @@ char *pa_module_list_to_string(struct pa_core *c); void pa_module_unload_request(struct pa_core *c, struct pa_module *m); +/* These to following prototypes are for module entrypoints and not implemented by the core */ +int pa_module_init(struct pa_core *c, struct pa_module*m); +void pa_module_done(struct pa_core *c, struct pa_module*m); + #endif diff --git a/src/pacat-simple.c b/src/pacat-simple.c index 5408221c..8b48bdd3 100644 --- a/src/pacat-simple.c +++ b/src/pacat-simple.c @@ -19,7 +19,7 @@ int main(int argc, char*argv[]) { int error; if (!(s = pa_simple_new(NULL, argv[0], PA_STREAM_PLAYBACK, NULL, "playback", &ss, NULL, &error))) { - fprintf(stderr, "Failed to connect to server: %s\n", pa_strerror(error)); + fprintf(stderr, __FILE__": pa_simple_new() failed: %s\n", pa_strerror(error)); goto finish; } @@ -31,16 +31,16 @@ int main(int argc, char*argv[]) { if (r == 0) /* eof */ break; - fprintf(stderr, "read() failed: %s\n", strerror(errno)); + fprintf(stderr, __FILE__": read() failed: %s\n", strerror(errno)); goto finish; } if (pa_simple_write(s, buf, r, &error) < 0) { - fprintf(stderr, "Failed to write data: %s\n", pa_strerror(error)); + fprintf(stderr, __FILE__": pa_simple_write() failed: %s\n", pa_strerror(error)); goto finish; } } - + ret = 0; finish: diff --git a/src/pacat.c b/src/pacat.c index c69148e6..75a94fc0 100644 --- a/src/pacat.c +++ b/src/pacat.c @@ -1,3 +1,4 @@ +#include #include #include #include @@ -6,7 +7,9 @@ #include #include "polyp.h" +#include "polyp-error.h" #include "mainloop.h" +#include "mainloop-signal.h" static struct pa_context *context = NULL; static struct pa_stream *stream = NULL; @@ -17,21 +20,29 @@ static size_t buffer_length = 0, buffer_index = 0; static void* stdin_source = NULL; +static void quit(int ret) { + assert(mainloop_api); + mainloop_api->quit(mainloop_api, ret); +} + static void context_die_callback(struct pa_context *c, void *userdata) { assert(c); fprintf(stderr, "Connection to server shut down, exiting.\n"); - mainloop_api->quit(mainloop_api, 1); + quit(1); } static void stream_die_callback(struct pa_stream *s, void *userdata) { assert(s); fprintf(stderr, "Stream deleted, exiting.\n"); - mainloop_api->quit(mainloop_api, 1); + quit(1); } static void do_write(size_t length) { size_t l; - assert(buffer && buffer_length); + assert(length); + + if (!buffer || !buffer_length) + return; l = length; if (l > buffer_length) @@ -50,8 +61,9 @@ static void do_write(size_t length) { static void stream_write_callback(struct pa_stream *s, size_t length, void *userdata) { assert(s && length); - - mainloop_api->enable_io(mainloop_api, stdin_source, PA_MAINLOOP_API_IO_EVENT_INPUT); + + if (stdin_source) + mainloop_api->enable_io(mainloop_api, stdin_source, PA_MAINLOOP_API_IO_EVENT_INPUT); if (!buffer) return; @@ -63,13 +75,12 @@ static void stream_complete_callback(struct pa_stream*s, int success, void *user assert(s); if (!success) { - fprintf(stderr, "Stream creation failed.\n"); - mainloop_api->quit(mainloop_api, 1); + fprintf(stderr, "Stream creation failed: %s\n", pa_strerror(pa_context_errno(pa_stream_get_context(s)))); + quit(1); return; } - pa_stream_set_die_callback(s, stream_die_callback, NULL); - pa_stream_set_write_callback(s, stream_write_callback, NULL); + fprintf(stderr, "Stream created.\n"); } static void context_complete_callback(struct pa_context *c, int success, void *userdata) { @@ -82,43 +93,59 @@ static void context_complete_callback(struct pa_context *c, int success, void *u assert(c && !stream); if (!success) { - fprintf(stderr, "Connection failed\n"); + fprintf(stderr, "Connection failed: %s\n", pa_strerror(pa_context_errno(c))); goto fail; } + + fprintf(stderr, "Connection established.\n"); if (!(stream = pa_stream_new(c, PA_STREAM_PLAYBACK, NULL, "pacat", &ss, NULL, stream_complete_callback, NULL))) { - fprintf(stderr, "pa_stream_new() failed.\n"); + fprintf(stderr, "pa_stream_new() failed: %s\n", pa_strerror(pa_context_errno(c))); goto fail; } + pa_stream_set_die_callback(stream, stream_die_callback, NULL); + pa_stream_set_write_callback(stream, stream_write_callback, NULL); + return; fail: - mainloop_api->quit(mainloop_api, 1); + quit(1); +} + +static void context_drain_complete(struct pa_context*c, void *userdata) { + quit(0); } static void stdin_callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) { size_t l, w = 0; ssize_t r; - assert(a == mainloop_api && id && fd == STDIN_FILENO && events == PA_MAINLOOP_API_IO_EVENT_INPUT); + assert(a == mainloop_api && id && fd == STDIN_FILENO && events == PA_MAINLOOP_API_IO_EVENT_INPUT && stdin_source == id); if (buffer) { mainloop_api->enable_io(mainloop_api, stdin_source, PA_MAINLOOP_API_IO_EVENT_NULL); return; } - if (!stream || !(l = w = pa_stream_writable_size(stream))) + if (!stream || !pa_stream_is_ready(stream) || !(l = w = pa_stream_writable_size(stream))) l = 4096; + buffer = malloc(l); assert(buffer); if ((r = read(fd, buffer, l)) <= 0) { - if (r == 0) - mainloop_api->quit(mainloop_api, 0); - else { + if (r == 0) { + fprintf(stderr, "Got EOF.\n"); + if (pa_context_drain(context, context_drain_complete, NULL) < 0) + quit(0); + else + fprintf(stderr, "Draining connection to server.\n"); + } else { fprintf(stderr, "read() failed: %s\n", strerror(errno)); - mainloop_api->quit(mainloop_api, 1); + quit(1); } + mainloop_api->cancel_io(mainloop_api, stdin_source); + stdin_source = NULL; return; } @@ -129,9 +156,15 @@ static void stdin_callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_m do_write(w); } + +static void exit_signal_callback(void *id, int sig, void *userdata) { + fprintf(stderr, "Got SIGINT, exiting.\n"); + quit(0); +} + int main(int argc, char *argv[]) { struct pa_mainloop* m; - int ret = 1; + int ret = 1, r; if (!(m = pa_mainloop_new())) { fprintf(stderr, "pa_mainloop_new() failed.\n"); @@ -140,6 +173,11 @@ int main(int argc, char *argv[]) { mainloop_api = pa_mainloop_get_api(m); + r = pa_signal_init(mainloop_api); + assert(r == 0); + pa_signal_register(SIGINT, exit_signal_callback, NULL); + signal(SIGPIPE, SIG_IGN); + if (!(stdin_source = mainloop_api->source_io(mainloop_api, STDIN_FILENO, PA_MAINLOOP_API_IO_EVENT_INPUT, stdin_callback, NULL))) { fprintf(stderr, "source_io() failed.\n"); goto quit; diff --git a/src/pdispatch.c b/src/pdispatch.c index c2db134d..ec454190 100644 --- a/src/pdispatch.c +++ b/src/pdispatch.c @@ -4,10 +4,26 @@ #include "pdispatch.h" #include "protocol-native-spec.h" +static const char *command_names[PA_COMMAND_MAX] = { + [PA_COMMAND_ERROR] = "ERROR", + [PA_COMMAND_TIMEOUT] = "TIMEOUT", + [PA_COMMAND_REPLY] = "REPLY", + [PA_COMMAND_CREATE_PLAYBACK_STREAM] = "CREATE_PLAYBACK_STREAM", + [PA_COMMAND_DELETE_PLAYBACK_STREAM] = "DELETE_PLAYBACK_STREAM", + [PA_COMMAND_CREATE_RECORD_STREAM] = "CREATE_RECORD_STREAM", + [PA_COMMAND_DELETE_RECORD_STREAM] = "DELETE_RECORD_STREAM", + [PA_COMMAND_AUTH] = "AUTH", + [PA_COMMAND_REQUEST] = "REQUEST", + [PA_COMMAND_EXIT] = "EXIT", + [PA_COMMAND_SET_NAME] = "SET_NAME", + [PA_COMMAND_LOOKUP_SINK] = "LOOKUP_SINK", + [PA_COMMAND_LOOKUP_SOURCE] = "LOOKUP_SOURCE", +}; + struct reply_info { struct pa_pdispatch *pdispatch; struct reply_info *next, *previous; - int (*callback)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); + void (*callback)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); void *userdata; uint32_t tag; void *mainloop_timeout; @@ -18,6 +34,9 @@ struct pa_pdispatch { const struct pa_pdispatch_command *command_table; unsigned n_commands; struct reply_info *replies; + void (*drain_callback)(struct pa_pdispatch *pd, void *userdata); + void *drain_userdata; + int in_use, shall_free; }; static void reply_info_free(struct reply_info *r) { @@ -49,11 +68,21 @@ struct pa_pdispatch* pa_pdispatch_new(struct pa_mainloop_api *mainloop, const st pd->command_table = table; pd->n_commands = entries; pd->replies = NULL; + pd->drain_callback = NULL; + pd->drain_userdata = NULL; + + pd->in_use = pd->shall_free = 0; return pd; } void pa_pdispatch_free(struct pa_pdispatch *pd) { assert(pd); + + if (pd->in_use) { + pd->shall_free = 1; + return; + } + while (pd->replies) reply_info_free(pd->replies); free(pd); @@ -61,60 +90,61 @@ void pa_pdispatch_free(struct pa_pdispatch *pd) { int pa_pdispatch_run(struct pa_pdispatch *pd, struct pa_packet*packet, void *userdata) { uint32_t tag, command; - assert(pd && packet); struct pa_tagstruct *ts = NULL; - assert(pd && packet && packet->data); + int ret = -1; + assert(pd && packet && packet->data && !pd->in_use); if (packet->length <= 8) - goto fail; + goto finish; ts = pa_tagstruct_new(packet->data, packet->length); assert(ts); if (pa_tagstruct_getu32(ts, &command) < 0 || pa_tagstruct_getu32(ts, &tag) < 0) - goto fail; + goto finish; + + /*fprintf(stderr, __FILE__": Recieved opcode <%s>\n", command_names[command]);*/ if (command == PA_COMMAND_ERROR || command == PA_COMMAND_REPLY) { struct reply_info *r; - int done = 0; for (r = pd->replies; r; r = r->next) { - if (r->tag == tag) { - int ret = r->callback(r->pdispatch, command, tag, ts, r->userdata); - reply_info_free(r); - - if (ret < 0) - goto fail; - - done = 1; + if (r->tag != tag) + continue; + + pd->in_use = 1; + assert(r->callback); + r->callback(r->pdispatch, command, tag, ts, r->userdata); + pd->in_use = 0; + reply_info_free(r); + + if (pd->shall_free) { + pa_pdispatch_free(pd); break; } - } - if (!done) - goto fail; + if (pd->drain_callback && !pa_pdispatch_is_pending(r->pdispatch)) + pd->drain_callback(r->pdispatch, r->pdispatch->drain_userdata); + + break; + } } else if (pd->command_table && command < pd->n_commands) { const struct pa_pdispatch_command *c = pd->command_table+command; - if (!c->proc) - goto fail; - - if (c->proc(pd, command, tag, ts, userdata) < 0) - goto fail; + if (c->proc) + c->proc(pd, command, tag, ts, userdata); } else - goto fail; - - pa_tagstruct_free(ts); - - return 0; + goto finish; -fail: + ret = 0; + +finish: if (ts) pa_tagstruct_free(ts); - return -1; + return ret; } static void timeout_callback(struct pa_mainloop_api*m, void *id, const struct timeval *tv, void *userdata) { @@ -123,9 +153,12 @@ static void timeout_callback(struct pa_mainloop_api*m, void *id, const struct ti r->callback(r->pdispatch, PA_COMMAND_TIMEOUT, r->tag, NULL, r->userdata); reply_info_free(r); + + if (r->pdispatch->drain_callback && !pa_pdispatch_is_pending(r->pdispatch)) + r->pdispatch->drain_callback(r->pdispatch, r->pdispatch->drain_userdata); } -void pa_pdispatch_register_reply(struct pa_pdispatch *pd, uint32_t tag, int timeout, int (*cb)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata), void *userdata) { +void pa_pdispatch_register_reply(struct pa_pdispatch *pd, uint32_t tag, int timeout, void (*cb)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata), void *userdata) { struct reply_info *r; struct timeval tv; assert(pd && cb); @@ -149,3 +182,17 @@ void pa_pdispatch_register_reply(struct pa_pdispatch *pd, uint32_t tag, int time r->next->previous = r; pd->replies = r; } + +int pa_pdispatch_is_pending(struct pa_pdispatch *pd) { + assert(pd); + + return !!pd->replies; +} + +void pa_pdispatch_set_drain_callback(struct pa_pdispatch *pd, void (*cb)(struct pa_pdispatch *pd, void *userdata), void *userdata) { + assert(pd); + assert(!cb || pa_pdispatch_is_pending(pd)); + + pd->drain_callback = cb; + pd->drain_userdata = userdata; +} diff --git a/src/pdispatch.h b/src/pdispatch.h index 73686700..35e93829 100644 --- a/src/pdispatch.h +++ b/src/pdispatch.h @@ -8,8 +8,10 @@ struct pa_pdispatch; +/* It is safe to destroy the calling pdispatch object from all callbacks */ + struct pa_pdispatch_command { - int (*proc)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); + void (*proc)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); }; struct pa_pdispatch* pa_pdispatch_new(struct pa_mainloop_api *m, const struct pa_pdispatch_command*table, unsigned entries); @@ -17,6 +19,10 @@ void pa_pdispatch_free(struct pa_pdispatch *pd); int pa_pdispatch_run(struct pa_pdispatch *pd, struct pa_packet*p, void *userdata); -void pa_pdispatch_register_reply(struct pa_pdispatch *pd, uint32_t tag, int timeout, int (*cb)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata), void *userdata); +void pa_pdispatch_register_reply(struct pa_pdispatch *pd, uint32_t tag, int timeout, void (*cb)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata), void *userdata); + +int pa_pdispatch_is_pending(struct pa_pdispatch *pd); + +void pa_pdispatch_set_drain_callback(struct pa_pdispatch *pd, void (*cb)(struct pa_pdispatch *pd, void *userdata), void *userdata); #endif diff --git a/src/polyp.c b/src/polyp.c index c15d5d9f..9af8d468 100644 --- a/src/polyp.c +++ b/src/polyp.c @@ -11,10 +11,13 @@ #include "socket-client.h" #include "pstream-util.h" #include "authkey.h" +#include "util.h" -#define DEFAULT_QUEUE_LENGTH 10240 -#define DEFAULT_MAX_LENGTH 20480 +#define DEFAULT_MAXLENGTH 20480 +#define DEFAULT_TLENGTH 10240 #define DEFAULT_PREBUF 4096 +#define DEFAULT_MINREQ 1024 + #define DEFAULT_TIMEOUT (5*60) #define DEFAULT_SERVER "/tmp/polypaudio/native" @@ -28,25 +31,40 @@ struct pa_context { struct pa_stream *first_stream; uint32_t ctag; uint32_t error; - enum { CONTEXT_UNCONNECTED, CONTEXT_CONNECTING, CONTEXT_AUTHORIZING, CONTEXT_SETTING_NAME, CONTEXT_READY, CONTEXT_DEAD} state; + enum { + CONTEXT_UNCONNECTED, + CONTEXT_CONNECTING, + CONTEXT_AUTHORIZING, + CONTEXT_SETTING_NAME, + CONTEXT_READY, + CONTEXT_DEAD + } state; void (*connect_complete_callback)(struct pa_context*c, int success, void *userdata); void *connect_complete_userdata; + void (*drain_complete_callback)(struct pa_context*c, void *userdata); + void *drain_complete_userdata; + void (*die_callback)(struct pa_context*c, void *userdata); void *die_userdata; - + uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH]; }; struct pa_stream { struct pa_context *context; struct pa_stream *next, *previous; + + char *name; + struct pa_buffer_attr buffer_attr; + struct pa_sample_spec sample_spec; uint32_t device_index; uint32_t channel; int channel_valid; enum pa_stream_direction direction; - enum { STREAM_CREATING, STREAM_READY, STREAM_DEAD} state; + + enum { STREAM_LOOKING_UP, STREAM_CREATING, STREAM_READY, STREAM_DEAD} state; uint32_t requested_bytes; void (*read_callback)(struct pa_stream *p, const void*data, size_t length, void *userdata); @@ -62,7 +80,7 @@ struct pa_stream { void *die_userdata; }; -static int command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); +static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = { [PA_COMMAND_ERROR] = { NULL }, @@ -76,8 +94,9 @@ static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = { }; struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name) { - assert(mainloop && name); struct pa_context *c; + assert(mainloop && name); + c = malloc(sizeof(struct pa_context)); assert(c); c->name = strdup(name); @@ -95,9 +114,13 @@ struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char * c->connect_complete_callback = NULL; c->connect_complete_userdata = NULL; + c->drain_complete_callback = NULL; + c->drain_complete_userdata = NULL; + c->die_callback = NULL; c->die_userdata = NULL; - + + pa_check_for_sigpipe(); return c; } @@ -121,84 +144,105 @@ void pa_context_free(struct pa_context *c) { } static void stream_dead(struct pa_stream *s) { + assert(s); + if (s->state == STREAM_DEAD) return; - - s->state = STREAM_DEAD; - if (s->die_callback) - s->die_callback(s, s->die_userdata); + + if (s->state == STREAM_READY) { + s->state = STREAM_DEAD; + if (s->die_callback) + s->die_callback(s, s->die_userdata); + } else + s->state = STREAM_DEAD; } static void context_dead(struct pa_context *c) { struct pa_stream *s; assert(c); - for (s = c->first_stream; s; s = s->next) - stream_dead(s); - if (c->state == CONTEXT_DEAD) return; + + if (c->pdispatch) + pa_pdispatch_free(c->pdispatch); + c->pdispatch = NULL; + + if (c->pstream) + pa_pstream_free(c->pstream); + c->pstream = NULL; + + if (c->client) + pa_socket_client_free(c->client); + c->client = NULL; - c->state = CONTEXT_DEAD; - if (c->die_callback) - c->die_callback(c, c->die_userdata); + for (s = c->first_stream; s; s = s->next) + stream_dead(s); + + if (c->state == CONTEXT_READY) { + c->state = CONTEXT_DEAD; + if (c->die_callback) + c->die_callback(c, c->die_userdata); + } else + s->state = CONTEXT_DEAD; } static void pstream_die_callback(struct pa_pstream *p, void *userdata) { struct pa_context *c = userdata; assert(p && c); - - assert(c->state != CONTEXT_DEAD); - - c->state = CONTEXT_DEAD; - context_dead(c); } -static int pstream_packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) { +static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) { struct pa_context *c = userdata; assert(p && packet && c); if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) { fprintf(stderr, "polyp.c: invalid packet.\n"); context_dead(c); - return -1; } - - - return 0; } -static int pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata) { +static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata) { struct pa_context *c = userdata; struct pa_stream *s; assert(p && chunk && c && chunk->memblock && chunk->memblock->data); if (!(s = pa_dynarray_get(c->streams, channel))) - return 0; + return; if (s->read_callback) s->read_callback(s, chunk->memblock->data + chunk->index, chunk->length, s->read_userdata); +} + +static int handle_error(struct pa_context *c, uint32_t command, struct pa_tagstruct *t) { + assert(c && t); - return 0; + if (command == PA_COMMAND_ERROR) { + if (pa_tagstruct_getu32(t, &c->error) < 0) { + c->error = PA_ERROR_PROTOCOL; + return -1; + } + + return 0; + } + + c->error = (command == PA_COMMAND_TIMEOUT) ? PA_ERROR_TIMEOUT : PA_ERROR_INTERNAL; + return -1; } -static int auth_complete_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { +static void setup_complete_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { struct pa_context *c = userdata; assert(pd && c && (c->state == CONTEXT_AUTHORIZING || c->state == CONTEXT_SETTING_NAME)); if (command != PA_COMMAND_REPLY) { - if (command == PA_COMMAND_ERROR && pa_tagstruct_getu32(t, &c->error) < 0) - c->error = PA_ERROR_PROTOCOL; - else if (command == PA_COMMAND_TIMEOUT) - c->error = PA_ERROR_TIMEOUT; - - c->state = CONTEXT_DEAD; + handle_error(c, command, t); + context_dead(c); if (c->connect_complete_callback) c->connect_complete_callback(c, 0, c->connect_complete_userdata); - return -1; + return; } if (c->state == CONTEXT_AUTHORIZING) { @@ -210,7 +254,7 @@ static int auth_complete_callback(struct pa_pdispatch *pd, uint32_t command, uin pa_tagstruct_putu32(t, tag = c->ctag++); pa_tagstruct_puts(t, c->name); pa_pstream_send_tagstruct(c->pstream, t); - pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, auth_complete_callback, c); + pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, c); } else { assert(c->state == CONTEXT_SETTING_NAME); @@ -220,7 +264,7 @@ static int auth_complete_callback(struct pa_pdispatch *pd, uint32_t command, uin c->connect_complete_callback(c, 1, c->connect_complete_userdata); } - return 0; + return; } static void on_connection(struct pa_socket_client *client, struct pa_iochannel*io, void *userdata) { @@ -234,7 +278,7 @@ static void on_connection(struct pa_socket_client *client, struct pa_iochannel*i if (!io) { c->error = PA_ERROR_CONNECTIONREFUSED; - c->state = CONTEXT_UNCONNECTED; + context_dead(c); if (c->connect_complete_callback) c->connect_complete_callback(c, 0, c->connect_complete_userdata); @@ -257,7 +301,7 @@ static void on_connection(struct pa_socket_client *client, struct pa_iochannel*i pa_tagstruct_putu32(t, tag = c->ctag++); pa_tagstruct_put_arbitrary(t, c->auth_cookie, sizeof(c->auth_cookie)); pa_pstream_send_tagstruct(c->pstream, t); - pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, auth_complete_callback, c); + pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, c); c->state = CONTEXT_AUTHORIZING; } @@ -305,7 +349,7 @@ void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_cont c->die_userdata = userdata; } -static int command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { +static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { struct pa_stream *s; struct pa_context *c = userdata; uint32_t bytes, channel; @@ -315,63 +359,122 @@ static int command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t t pa_tagstruct_getu32(t, &bytes) < 0 || !pa_tagstruct_eof(t)) { c->error = PA_ERROR_PROTOCOL; - return -1; + context_dead(c); + return; } - if (!(s = pa_dynarray_get(c->streams, channel))) { - c->error = PA_ERROR_PROTOCOL; - return -1; - } + if (!(s = pa_dynarray_get(c->streams, channel))) + return; - /*fprintf(stderr, "Requested %u bytes\n", bytes);*/ + if (s->state != STREAM_READY) + return; s->requested_bytes += bytes; if (s->requested_bytes && s->write_callback) s->write_callback(s, s->requested_bytes, s->write_userdata); - - return 0; } -static int create_playback_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { - int ret = 0; +static void create_playback_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { struct pa_stream *s = userdata; assert(pd && s && s->state == STREAM_CREATING); if (command != PA_COMMAND_REPLY) { - struct pa_context *c = s->context; - assert(c); + if (handle_error(s->context, command, t) < 0) { + context_dead(s->context); + return; + } - if (command == PA_COMMAND_ERROR && pa_tagstruct_getu32(t, &s->context->error) < 0) - s->context->error = PA_ERROR_PROTOCOL; - else if (command == PA_COMMAND_TIMEOUT) - s->context->error = PA_ERROR_TIMEOUT; - - ret = -1; - goto fail; + stream_dead(s); + if (s->create_complete_callback) + s->create_complete_callback(s, 0, s->create_complete_userdata); + + return; } if (pa_tagstruct_getu32(t, &s->channel) < 0 || pa_tagstruct_getu32(t, &s->device_index) < 0 || !pa_tagstruct_eof(t)) { s->context->error = PA_ERROR_PROTOCOL; - ret = -1; - goto fail; + context_dead(s->context); + return; } s->channel_valid = 1; pa_dynarray_put(s->context->streams, s->channel, s); s->state = STREAM_READY; - assert(s->create_complete_callback); - s->create_complete_callback(s, 1, s->create_complete_userdata); - return 0; + if (s->create_complete_callback) + s->create_complete_callback(s, 1, s->create_complete_userdata); +} + +static void create_stream(struct pa_stream *s, uint32_t tdev_index) { + struct pa_tagstruct *t; + uint32_t tag; + assert(s); -fail: - assert(s->create_complete_callback); - s->create_complete_callback(s, 0, s->create_complete_userdata); - pa_stream_free(s); - return ret; + s->state = STREAM_CREATING; + + t = pa_tagstruct_new(NULL, 0); + assert(t); + + pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM); + pa_tagstruct_putu32(t, tag = s->context->ctag++); + pa_tagstruct_puts(t, s->name); + pa_tagstruct_put_sample_spec(t, &s->sample_spec); + pa_tagstruct_putu32(t, tdev_index); + pa_tagstruct_putu32(t, s->buffer_attr.maxlength); + pa_tagstruct_putu32(t, s->buffer_attr.tlength); + pa_tagstruct_putu32(t, s->buffer_attr.prebuf); + pa_tagstruct_putu32(t, s->buffer_attr.minreq); + + pa_pstream_send_tagstruct(s->context->pstream, t); + pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, create_playback_callback, s); +} + +static void lookup_device_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { + struct pa_stream *s = userdata; + uint32_t tdev; + assert(pd && s && s->state == STREAM_LOOKING_UP); + + if (command != PA_COMMAND_REPLY) { + if (handle_error(s->context, command, t) < 0) { + context_dead(s->context); + return; + } + + stream_dead(s); + if (s->create_complete_callback) + s->create_complete_callback(s, 0, s->create_complete_userdata); + return; + } + + if (pa_tagstruct_getu32(t, &tdev) < 0 || + !pa_tagstruct_eof(t)) { + s->context->error = PA_ERROR_PROTOCOL; + context_dead(s->context); + return; + } + + create_stream(s, tdev); +} + +static void lookup_device(struct pa_stream *s, const char *tdev) { + struct pa_tagstruct *t; + uint32_t tag; + assert(s); + + s->state = STREAM_LOOKING_UP; + + t = pa_tagstruct_new(NULL, 0); + assert(t); + + pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_LOOKUP_SINK : PA_COMMAND_LOOKUP_SOURCE); + pa_tagstruct_putu32(t, tag = s->context->ctag++); + pa_tagstruct_puts(t, tdev); + + pa_pstream_send_tagstruct(s->context->pstream, t); + pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, lookup_device_callback, s); } struct pa_stream* pa_stream_new( @@ -385,10 +488,8 @@ struct pa_stream* pa_stream_new( void *userdata) { struct pa_stream *s; - struct pa_tagstruct *t; - uint32_t tag; - assert(c && name && ss && c->state == CONTEXT_READY && complete); + assert(c && name && ss && c->state == CONTEXT_READY); s = malloc(sizeof(struct pa_stream)); assert(s); @@ -403,42 +504,43 @@ struct pa_stream* pa_stream_new( s->create_complete_callback = complete; s->create_complete_userdata = NULL; + s->name = strdup(name); s->state = STREAM_CREATING; s->requested_bytes = 0; s->channel = 0; s->channel_valid = 0; s->device_index = (uint32_t) -1; s->direction = dir; + s->sample_spec = *ss; + if (attr) + s->buffer_attr = *attr; + else { + s->buffer_attr.maxlength = DEFAULT_MAXLENGTH; + s->buffer_attr.tlength = DEFAULT_TLENGTH; + s->buffer_attr.prebuf = DEFAULT_PREBUF; + s->buffer_attr.minreq = DEFAULT_MINREQ; + } - t = pa_tagstruct_new(NULL, 0); - assert(t); - - pa_tagstruct_putu32(t, dir == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM); - pa_tagstruct_putu32(t, tag = c->ctag++); - pa_tagstruct_puts(t, name); - pa_tagstruct_put_sample_spec(t, ss); - pa_tagstruct_putu32(t, (uint32_t) -1); - pa_tagstruct_putu32(t, attr ? attr->queue_length : DEFAULT_QUEUE_LENGTH); - pa_tagstruct_putu32(t, attr ? attr->max_length : DEFAULT_MAX_LENGTH); - pa_tagstruct_putu32(t, attr ? attr->prebuf : DEFAULT_PREBUF); - - pa_pstream_send_tagstruct(c->pstream, t); - - pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, create_playback_callback, s); - s->next = c->first_stream; if (s->next) s->next->previous = s; s->previous = NULL; c->first_stream = s; - return 0; + if (dev) + lookup_device(s, dev); + else + create_stream(s, (uint32_t) -1); + + return s; } void pa_stream_free(struct pa_stream *s) { assert(s && s->context); + + free(s->name); - if (s->channel_valid) { + if (s->channel_valid && s->context->state == CONTEXT_READY) { struct pa_tagstruct *t = pa_tagstruct_new(NULL, 0); assert(t); @@ -469,7 +571,7 @@ void pa_stream_set_write_callback(struct pa_stream *s, void (*cb)(struct pa_stre void pa_stream_write(struct pa_stream *s, const void *data, size_t length) { struct pa_memchunk chunk; - assert(s && s->context && data && length); + assert(s && s->context && data && length && s->state == STREAM_READY); chunk.memblock = pa_memblock_new(length); assert(chunk.memblock && chunk.memblock->data); @@ -489,7 +591,7 @@ void pa_stream_write(struct pa_stream *s, const void *data, size_t length) { } size_t pa_stream_writable_size(struct pa_stream *s) { - assert(s); + assert(s && s->state == STREAM_READY); return s->requested_bytes; } @@ -512,3 +614,72 @@ void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream s->die_callback = cb; s->die_userdata = userdata; } + +int pa_context_is_pending(struct pa_context *c) { + assert(c); + + if (c->state != CONTEXT_READY) + return 0; + + return pa_pstream_is_pending(c->pstream) || pa_pdispatch_is_pending(c->pdispatch); +} + +struct pa_context* pa_stream_get_context(struct pa_stream *p) { + assert(p); + return p->context; +} + +static void set_dispatch_callbacks(struct pa_context *c); + +static void pdispatch_drain_callback(struct pa_pdispatch*pd, void *userdata) { + set_dispatch_callbacks(userdata); +} + +static void pstream_drain_callback(struct pa_pstream *s, void *userdata) { + set_dispatch_callbacks(userdata); +} + +static void set_dispatch_callbacks(struct pa_context *c) { + assert(c && c->state == CONTEXT_READY); + + pa_pstream_set_drain_callback(c->pstream, NULL, NULL); + pa_pdispatch_set_drain_callback(c->pdispatch, NULL, NULL); + + if (pa_pdispatch_is_pending(c->pdispatch)) { + pa_pdispatch_set_drain_callback(c->pdispatch, pdispatch_drain_callback, c); + return; + } + + if (pa_pstream_is_pending(c->pstream)) { + pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c); + return; + } + + assert(c->drain_complete_callback); + c->drain_complete_callback(c, c->drain_complete_userdata); +} + +int pa_context_drain( + struct pa_context *c, + void (*complete) (struct pa_context*c, void *userdata), + void *userdata) { + + assert(c && c->state == CONTEXT_READY); + + if (complete == NULL) { + c->drain_complete_callback = NULL; + pa_pstream_set_drain_callback(c->pstream, NULL, NULL); + pa_pdispatch_set_drain_callback(c->pdispatch, NULL, NULL); + return 0; + } + + if (!pa_context_is_pending(c)) + return -1; + + c->drain_complete_callback = complete; + c->drain_complete_userdata = userdata; + + set_dispatch_callbacks(c); + + return 0; +} diff --git a/src/polyp.h b/src/polyp.h index 77a6966f..25ee3bed 100644 --- a/src/polyp.h +++ b/src/polyp.h @@ -17,6 +17,11 @@ int pa_context_connect( void (*complete) (struct pa_context*c, int success, void *userdata), void *userdata); +int pa_context_drain( + struct pa_context *c, + void (*complete) (struct pa_context*c, void *userdata), + void *userdata); + void pa_context_free(struct pa_context *c); void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata); @@ -25,6 +30,8 @@ int pa_context_is_dead(struct pa_context *c); int pa_context_is_ready(struct pa_context *c); int pa_context_errno(struct pa_context *c); +int pa_context_is_pending(struct pa_context *c); + struct pa_stream; struct pa_stream* pa_stream_new( @@ -50,4 +57,6 @@ void pa_stream_set_read_callback(struct pa_stream *p, void (*cb)(struct pa_strea int pa_stream_is_dead(struct pa_stream *p); int pa_stream_is_ready(struct pa_stream*p); +struct pa_context* pa_stream_get_context(struct pa_stream *p); + #endif diff --git a/src/polypdef.h b/src/polypdef.h index aa6e6bf6..80b3cc6c 100644 --- a/src/polypdef.h +++ b/src/polypdef.h @@ -9,9 +9,10 @@ enum pa_stream_direction { }; struct pa_buffer_attr { - uint32_t queue_length; - uint32_t max_length; + uint32_t maxlength; + uint32_t tlength; uint32_t prebuf; + uint32_t minreq; }; diff --git a/src/protocol-esound.c b/src/protocol-esound.c index cd6448fc..04006d5d 100644 --- a/src/protocol-esound.c +++ b/src/protocol-esound.c @@ -17,9 +17,7 @@ #define COOKIE_FILE ".esd_auth" -#define MEMBLOCKQ_LENGTH (10*1204) -#define MEMBLOCKQ_PREBUF (2*1024) -#define BUFSIZE (1024) +#define BUFFER_SECONDS (0.5) /* This is heavily based on esound's code */ @@ -196,6 +194,7 @@ static int esd_proto_stream_play(struct connection *c, const void *data, size_t int format, rate; struct pa_sink *sink; struct pa_sample_spec ss; + size_t l; assert(length == (sizeof(int)*2+ESD_NAME_MAX)); format = maybe_swap_endian_32(c->swap_byte_order, *(int*)data); @@ -217,7 +216,9 @@ static int esd_proto_stream_play(struct connection *c, const void *data, size_t pa_client_rename(c->client, name); assert(!c->input_memblockq); - c->input_memblockq = pa_memblockq_new(MEMBLOCKQ_LENGTH, pa_sample_size(&ss), MEMBLOCKQ_PREBUF); + + l = (size_t) (pa_bytes_per_second(&ss)*BUFFER_SECONDS); + c->input_memblockq = pa_memblockq_new(l, 0, pa_sample_size(&ss), l/2, l/10); assert(c->input_memblockq); assert(!c->sink_input); @@ -252,7 +253,7 @@ static int esd_proto_get_latency(struct connection *c, const void *data, size_t latency = 0; else { float usec = pa_sink_get_latency(sink); - usec += pa_samples_usec(MEMBLOCKQ_LENGTH-BUFSIZE, &sink->sample_spec); + usec += BUFFER_SECONDS*1000000*.9; /* A better estimation would be a good idea! */ latency = (int) ((usec*44100)/1000000); } @@ -452,16 +453,17 @@ static int do_read(struct connection *c) { } else if (c->state == ESD_STREAMING_DATA) { struct pa_memchunk chunk; ssize_t r; + size_t l; assert(c->input_memblockq); - if (!pa_memblockq_is_writable(c->input_memblockq, BUFSIZE)) + if (!(l = pa_memblockq_missing(c->input_memblockq))) return 0; - chunk.memblock = pa_memblock_new(BUFSIZE); + chunk.memblock = pa_memblock_new(l); assert(chunk.memblock && chunk.memblock->data); - if ((r = pa_iochannel_read(c->io, chunk.memblock->data, BUFSIZE)) <= 0) { + if ((r = pa_iochannel_read(c->io, chunk.memblock->data, l)) <= 0) { fprintf(stderr, "protocol-esound.c: read() failed: %s\n", r == 0 ? "EOF" : strerror(errno)); pa_memblock_unref(chunk.memblock); return -1; diff --git a/src/protocol-native-spec.h b/src/protocol-native-spec.h index 07fc735b..7fb9ac4a 100644 --- a/src/protocol-native-spec.h +++ b/src/protocol-native-spec.h @@ -13,6 +13,8 @@ enum { PA_COMMAND_REQUEST, PA_COMMAND_AUTH, PA_COMMAND_SET_NAME, + PA_COMMAND_LOOKUP_SINK, + PA_COMMAND_LOOKUP_SOURCE, PA_COMMAND_MAX }; diff --git a/src/protocol-native.c b/src/protocol-native.c index 9463a469..42ff4b52 100644 --- a/src/protocol-native.c +++ b/src/protocol-native.c @@ -14,6 +14,7 @@ #include "pdispatch.h" #include "pstream-util.h" #include "authkey.h" +#include "namereg.h" struct connection; struct pa_protocol_native; @@ -28,7 +29,6 @@ struct record_stream { struct playback_stream { struct connection *connection; uint32_t index; - size_t qlength; struct pa_sink_input *sink_input; struct pa_memblockq *memblockq; size_t requested_bytes; @@ -58,11 +58,12 @@ static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i); static void request_bytes(struct playback_stream*s); -static int command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); -static int command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); -static int command_delete_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); -static int command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); -static int command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); +static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); +static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); +static void command_delete_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); +static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); +static void command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); +static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = { [PA_COMMAND_ERROR] = { NULL }, @@ -76,6 +77,8 @@ static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = { [PA_COMMAND_REQUEST] = { NULL }, [PA_COMMAND_EXIT] = { command_exit }, [PA_COMMAND_SET_NAME] = { command_set_name }, + [PA_COMMAND_LOOKUP_SINK] = { command_lookup }, + [PA_COMMAND_LOOKUP_SOURCE] = { command_lookup }, }; /* structure management */ @@ -89,14 +92,17 @@ static void record_stream_free(struct record_stream* r) { free(r); } -static struct playback_stream* playback_stream_new(struct connection *c, struct pa_sink *sink, struct pa_sample_spec *ss, const char *name, size_t qlen, size_t maxlength, size_t prebuf) { +static struct playback_stream* playback_stream_new(struct connection *c, struct pa_sink *sink, struct pa_sample_spec *ss, const char *name, + size_t maxlength, + size_t tlength, + size_t prebuf, + size_t minreq) { struct playback_stream *s; - assert(c && sink && ss && name && qlen && maxlength && prebuf); + assert(c && sink && ss && name && maxlength); s = malloc(sizeof(struct playback_stream)); assert (s); s->connection = c; - s->qlength = qlen; s->sink_input = pa_sink_input_new(sink, name, ss); assert(s->sink_input); @@ -106,7 +112,7 @@ static struct playback_stream* playback_stream_new(struct connection *c, struct s->sink_input->get_latency = sink_input_get_latency_cb; s->sink_input->userdata = s; - s->memblockq = pa_memblockq_new(maxlength, pa_sample_size(ss), prebuf); + s->memblockq = pa_memblockq_new(maxlength, tlength, pa_sample_size(ss), prebuf, minreq); assert(s->memblockq); s->requested_bytes = 0; @@ -149,13 +155,17 @@ static void request_bytes(struct playback_stream *s) { size_t l; assert(s); - if (!(l = pa_memblockq_missing_to(s->memblockq, s->qlength))) + if (!(l = pa_memblockq_missing(s->memblockq))) return; if (l <= s->requested_bytes) return; l -= s->requested_bytes; + + if (l < pa_memblockq_get_minreq(s->memblockq)) + return; + s->requested_bytes += l; t = pa_tagstruct_new(NULL, 0); @@ -166,7 +176,7 @@ static void request_bytes(struct playback_stream *s) { pa_tagstruct_putu32(t, l); pa_pstream_send_tagstruct(s->connection->pstream, t); -/* fprintf(stderr, "Requesting %u bytes\n", l);*/ + /*fprintf(stderr, "Requesting %u bytes\n", l);*/ } /*** sinkinput callbacks ***/ @@ -209,10 +219,15 @@ static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i) { /*** pdispatch callbacks ***/ -static int command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { +static void protocol_error(struct connection *c) { + fprintf(stderr, __FILE__": protocol error, kicking client\n"); + connection_free(c); +} + +static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { struct connection *c = userdata; struct playback_stream *s; - size_t maxlength, prebuf, qlength; + size_t maxlength, tlength, prebuf, minreq; uint32_t sink_index; const char *name; struct pa_sample_spec ss; @@ -223,15 +238,18 @@ static int command_create_playback_stream(struct pa_pdispatch *pd, uint32_t comm if (pa_tagstruct_gets(t, &name) < 0 || pa_tagstruct_get_sample_spec(t, &ss) < 0 || pa_tagstruct_getu32(t, &sink_index) < 0 || - pa_tagstruct_getu32(t, &qlength) < 0 || pa_tagstruct_getu32(t, &maxlength) < 0 || + pa_tagstruct_getu32(t, &tlength) < 0 || pa_tagstruct_getu32(t, &prebuf) < 0 || - !pa_tagstruct_eof(t)) - return -1; + pa_tagstruct_getu32(t, &minreq) < 0 || + !pa_tagstruct_eof(t)) { + protocol_error(c); + return; + } if (!c->authorized) { pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS); - return 0; + return; } if (sink_index == (uint32_t) -1) @@ -241,12 +259,12 @@ static int command_create_playback_stream(struct pa_pdispatch *pd, uint32_t comm if (!sink) { pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST); - return 0; + return; } - if (!(s = playback_stream_new(c, sink, &ss, name, qlength, maxlength, prebuf))) { + if (!(s = playback_stream_new(c, sink, &ss, name, maxlength, tlength, prebuf, minreq))) { pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID); - return 0; + return; } reply = pa_tagstruct_new(NULL, 0); @@ -258,107 +276,148 @@ static int command_create_playback_stream(struct pa_pdispatch *pd, uint32_t comm pa_tagstruct_putu32(reply, s->sink_input->index); pa_pstream_send_tagstruct(c->pstream, reply); request_bytes(s); - return 0; } -static int command_delete_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { +static void command_delete_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { struct connection *c = userdata; uint32_t channel; struct playback_stream *s; assert(c && t); if (pa_tagstruct_getu32(t, &channel) < 0 || - !pa_tagstruct_eof(t)) - return -1; + !pa_tagstruct_eof(t)) { + protocol_error(c); + return; + } if (!c->authorized) { pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS); - return 0; + return; } if (!(s = pa_idxset_get_by_index(c->playback_streams, channel))) { pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST); - return 0; + return; } pa_pstream_send_simple_ack(c->pstream, tag); - return 0; } -static int command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { +static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { struct connection *c = userdata; assert(c && t); - if (!pa_tagstruct_eof(t)) - return -1; + if (!pa_tagstruct_eof(t)) { + protocol_error(c); + return; + } if (!c->authorized) { pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS); - return 0; + return; } assert(c->protocol && c->protocol->core && c->protocol->core->mainloop); c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0); pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */ - return 0; + return; } -static int command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { +static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { struct connection *c = userdata; const void*cookie; assert(c && t); if (pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 || - !pa_tagstruct_eof(t)) - return -1; - + !pa_tagstruct_eof(t)) { + protocol_error(c); + return; + } + if (memcmp(c->protocol->auth_cookie, cookie, PA_NATIVE_COOKIE_LENGTH) != 0) { fprintf(stderr, "protocol-native.c: Denied access to client with invalid authorization key.\n"); pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS); - return 0; + return; } c->authorized = 1; pa_pstream_send_simple_ack(c->pstream, tag); - return 0; + return; } -static int command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { +static void command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { struct connection *c = userdata; const char *name; assert(c && t); if (pa_tagstruct_gets(t, &name) < 0 || - !pa_tagstruct_eof(t)) - return -1; + !pa_tagstruct_eof(t)) { + protocol_error(c); + return; + } pa_client_rename(c->client, name); pa_pstream_send_simple_ack(c->pstream, tag); - return 0; + return; +} + +static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { + struct connection *c = userdata; + const char *name; + uint32_t index = PA_IDXSET_INVALID; + assert(c && t); + + if (pa_tagstruct_gets(t, &name) < 0 || + !pa_tagstruct_eof(t)) { + protocol_error(c); + return; + } + + if (command == PA_COMMAND_LOOKUP_SINK) { + struct pa_sink *sink; + if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK))) + index = sink->index; + } else { + struct pa_source *source; + assert(command == PA_COMMAND_LOOKUP_SOURCE); + if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE))) + index = source->index; + } + + if (index == PA_IDXSET_INVALID) + pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY); + else { + struct pa_tagstruct *reply; + reply = pa_tagstruct_new(NULL, 0); + assert(reply); + pa_tagstruct_putu32(reply, PA_COMMAND_REPLY); + pa_tagstruct_putu32(reply, tag); + pa_tagstruct_putu32(reply, index); + pa_pstream_send_tagstruct(c->pstream, reply); + } } /*** pstream callbacks ***/ -static int packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) { +static void packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) { struct connection *c = userdata; assert(p && packet && packet->data && c); if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) { fprintf(stderr, "protocol-native: invalid packet.\n"); - return -1; + connection_free(c); } - - return 0; } -static int memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata) { +static void memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata) { struct connection *c = userdata; struct playback_stream *stream; assert(p && chunk && userdata); if (!(stream = pa_idxset_get_by_index(c->playback_streams, channel))) { fprintf(stderr, "protocol-native: client sent block for invalid stream.\n"); - return -1; + connection_free(c); + return; } if (chunk->length >= stream->requested_bytes) @@ -371,8 +430,6 @@ static int memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t del pa_sink_notify(stream->sink_input->sink); /*fprintf(stderr, "Recieved %u bytes.\n", chunk->length);*/ - - return 0; } static void die_callback(struct pa_pstream *p, void *userdata) { diff --git a/src/protocol-simple.c b/src/protocol-simple.c index 380b1802..91eab59a 100644 --- a/src/protocol-simple.c +++ b/src/protocol-simple.c @@ -28,7 +28,8 @@ struct pa_protocol_simple { struct pa_sample_spec sample_spec; }; -#define BUFSIZE PIPE_BUF +#define PLAYBACK_BUFFER_SECONDS (.5) +#define RECORD_BUFFER_SECONDS (5) static void connection_free(struct connection *c) { assert(c); @@ -52,17 +53,18 @@ static void connection_free(struct connection *c) { static int do_read(struct connection *c) { struct pa_memchunk chunk; ssize_t r; + size_t l; if (!pa_iochannel_is_readable(c->io)) return 0; - if (!c->sink_input || !pa_memblockq_is_writable(c->input_memblockq, BUFSIZE)) + if (!c->sink_input || !(l = pa_memblockq_missing(c->input_memblockq))) return 0; - - chunk.memblock = pa_memblock_new(BUFSIZE); + + chunk.memblock = pa_memblock_new(l); assert(chunk.memblock); - if ((r = pa_iochannel_read(c->io, chunk.memblock->data, BUFSIZE)) <= 0) { + if ((r = pa_iochannel_read(c->io, chunk.memblock->data, l)) <= 0) { fprintf(stderr, "read(): %s\n", r == 0 ? "EOF" : strerror(errno)); pa_memblock_unref(chunk.memblock); return -1; @@ -213,8 +215,8 @@ static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, vo c->source_output->kill = source_output_kill_cb; c->source_output->userdata = c; - l = 5*pa_bytes_per_second(&p->sample_spec); /* 5s */ - c->output_memblockq = pa_memblockq_new(l, pa_sample_size(&p->sample_spec), l/2); + l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS); + c->output_memblockq = pa_memblockq_new(l, 0, pa_sample_size(&p->sample_spec), l/2, 0); } if (p->mode & PA_PROTOCOL_SIMPLE_PLAYBACK) { @@ -234,8 +236,8 @@ static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, vo c->sink_input->get_latency = sink_input_get_latency_cb; c->sink_input->userdata = c; - l = pa_bytes_per_second(&p->sample_spec)/2; /* half a second */ - c->input_memblockq = pa_memblockq_new(l, pa_sample_size(&p->sample_spec), l/2); + l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS); + c->input_memblockq = pa_memblockq_new(l, 0, pa_sample_size(&p->sample_spec), l/2, l/10); } diff --git a/src/pstream.c b/src/pstream.c index 1739780e..19f83e30 100644 --- a/src/pstream.c +++ b/src/pstream.c @@ -35,6 +35,8 @@ struct pa_pstream { struct pa_iochannel *io; struct pa_queue *send_queue; + int in_use, shall_free; + int dead; void (*die_callback) (struct pa_pstream *p, void *userdad); void *die_callback_userdata; @@ -46,9 +48,6 @@ struct pa_pstream { size_t index; } write; - void (*send_callback) (struct pa_pstream *p, void *userdata); - void *send_callback_userdata; - struct { struct pa_memblock *memblock; struct pa_packet *packet; @@ -57,34 +56,51 @@ struct pa_pstream { size_t index; } read; - int (*recieve_packet_callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata); + void (*recieve_packet_callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata); void *recieve_packet_callback_userdata; - int (*recieve_memblock_callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata); + void (*recieve_memblock_callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata); void *recieve_memblock_callback_userdata; + + void (*drain_callback)(struct pa_pstream *p, void *userdata); + void *drain_userdata; }; static void do_write(struct pa_pstream *p); static void do_read(struct pa_pstream *p); -static void io_callback(struct pa_iochannel*io, void *userdata) { - struct pa_pstream *p = userdata; - assert(p && p->io == io); - +static void do_something(struct pa_pstream *p) { + assert(p && !p->shall_free); p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 0); - + + p->in_use = 1; do_write(p); + p->in_use = 0; + + if (p->shall_free) { + pa_pstream_free(p); + return; + } + + p->in_use = 1; do_read(p); + p->in_use = 0; + if (p->shall_free) { + pa_pstream_free(p); + return; + } +} + +static void io_callback(struct pa_iochannel*io, void *userdata) { + struct pa_pstream *p = userdata; + assert(p && p->io == io); + do_something(p); } static void fixed_callback(struct pa_mainloop_api *m, void *id, void*userdata) { struct pa_pstream *p = userdata; assert(p && p->mainloop_source == id && p->mainloop == m); - - p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 0); - - do_write(p); - do_read(p); + do_something(p); } struct pa_pstream *pa_pstream_new(struct pa_mainloop_api *m, struct pa_iochannel *io) { @@ -115,15 +131,17 @@ struct pa_pstream *pa_pstream_new(struct pa_mainloop_api *m, struct pa_iochannel p->read.packet = NULL; p->read.index = 0; - p->send_callback = NULL; - p->send_callback_userdata = NULL; - p->recieve_packet_callback = NULL; p->recieve_packet_callback_userdata = NULL; p->recieve_memblock_callback = NULL; p->recieve_memblock_callback_userdata = NULL; + p->drain_callback = NULL; + p->drain_userdata = NULL; + + p->in_use = p->shall_free = 0; + return p; } @@ -146,6 +164,12 @@ static void item_free(void *item, void *p) { void pa_pstream_free(struct pa_pstream *p) { assert(p); + if (p->in_use) { + /* If this pstream object is used by someone else on the call stack, we have to postpone the freeing */ + p->dead = p->shall_free = 1; + return; + } + pa_iochannel_free(p->io); pa_queue_free(p->send_queue, item_free, NULL); @@ -162,13 +186,6 @@ void pa_pstream_free(struct pa_pstream *p) { free(p); } -void pa_pstream_set_send_callback(struct pa_pstream*p, void (*callback) (struct pa_pstream *p, void *userdata), void *userdata) { - assert(p && callback); - - p->send_callback = callback; - p->send_callback_userdata = userdata; -} - void pa_pstream_send_packet(struct pa_pstream*p, struct pa_packet *packet) { struct item_info *i; assert(p && packet); @@ -199,14 +216,14 @@ void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, int32_t del p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 1); } -void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, int (*callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata), void *userdata) { +void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata), void *userdata) { assert(p && callback); p->recieve_packet_callback = callback; p->recieve_packet_callback_userdata = userdata; } -void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, int (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata), void *userdata) { +void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata), void *userdata) { assert(p && callback); p->recieve_memblock_callback = callback; @@ -261,7 +278,7 @@ static void do_write(struct pa_pstream *p) { l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE); } - if ((r = pa_iochannel_write(p->io, d, l)) < 0) + if ((r = pa_iochannel_write(p->io, d, l)) < 0) goto die; p->write.index += r; @@ -271,8 +288,8 @@ static void do_write(struct pa_pstream *p) { item_free(p->write.current, (void *) 1); p->write.current = NULL; - if (p->send_callback && pa_queue_is_empty(p->send_queue)) - p->send_callback(p, p->send_callback_userdata); + if (p->drain_callback && !pa_pstream_is_pending(p)) + p->drain_callback(p, p->drain_userdata); } return; @@ -341,13 +358,14 @@ static void do_read(struct pa_pstream *p) { chunk.memblock = p->read.memblock; chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l; chunk.length = l; - - if (p->recieve_memblock_callback(p, - ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), - (int32_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA]), - &chunk, - p->recieve_memblock_callback_userdata) < 0) - goto die; + + if (p->recieve_memblock_callback) + p->recieve_memblock_callback( + p, + ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), + (int32_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA]), + &chunk, + p->recieve_memblock_callback_userdata); } } @@ -359,17 +377,13 @@ static void do_read(struct pa_pstream *p) { pa_memblock_unref(p->read.memblock); p->read.memblock = NULL; } else { - int r = 0; assert(p->read.packet); - + if (p->recieve_packet_callback) - r = p->recieve_packet_callback(p, p->read.packet, p->recieve_packet_callback_userdata); + p->recieve_packet_callback(p, p->read.packet, p->recieve_packet_callback_userdata); pa_packet_unref(p->read.packet); p->read.packet = NULL; - - if (r < 0) - goto die; } p->read.index = 0; @@ -390,3 +404,21 @@ void pa_pstream_set_die_callback(struct pa_pstream *p, void (*callback)(struct p p->die_callback = callback; p->die_callback_userdata = userdata; } + +int pa_pstream_is_pending(struct pa_pstream *p) { + assert(p); + + if (p->dead) + return 0; + + return p->write.current || !pa_queue_is_empty(p->send_queue); +} + +void pa_pstream_set_drain_callback(struct pa_pstream *p, void (*cb)(struct pa_pstream *p, void *userdata), void *userdata) { + assert(p); + assert(!cb || pa_pstream_is_pending(p)); + + p->drain_callback = cb; + p->drain_userdata = userdata; +} + diff --git a/src/pstream.h b/src/pstream.h index 0f5975d2..011b8d12 100644 --- a/src/pstream.h +++ b/src/pstream.h @@ -9,18 +9,22 @@ #include "mainloop-api.h" #include "memchunk.h" +/* It is safe to destroy the calling pstream object from all callbacks */ + struct pa_pstream; struct pa_pstream* pa_pstream_new(struct pa_mainloop_api *m, struct pa_iochannel *io); void pa_pstream_free(struct pa_pstream*p); -void pa_pstream_set_send_callback(struct pa_pstream*p, void (*callback) (struct pa_pstream *p, void *userdata), void *userdata); void pa_pstream_send_packet(struct pa_pstream*p, struct pa_packet *packet); void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk); -void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, int (*callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata), void *userdata); -void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, int (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata), void *userdata); +void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata), void *userdata); +void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata), void *userdata); +void pa_pstream_set_drain_callback(struct pa_pstream *p, void (*cb)(struct pa_pstream *p, void *userdata), void *userdata); void pa_pstream_set_die_callback(struct pa_pstream *p, void (*callback)(struct pa_pstream *p, void *userdata), void *userdata); +int pa_pstream_is_pending(struct pa_pstream *p); + #endif diff --git a/src/sample.h b/src/sample.h index 4131992c..df12924b 100644 --- a/src/sample.h +++ b/src/sample.h @@ -32,6 +32,8 @@ uint32_t pa_samples_usec(size_t length, const struct pa_sample_spec *spec); int pa_sample_spec_valid(const struct pa_sample_spec *spec); int pa_sample_spec_equal(const struct pa_sample_spec*a, const struct pa_sample_spec*b); + +#define PA_SAMPLE_SNPRINT_MAX_LENGTH 32 void pa_sample_snprint(char *s, size_t l, const struct pa_sample_spec *spec); #endif diff --git a/src/simple.c b/src/simple.c index c1d1e96c..cf31ac52 100644 --- a/src/simple.c +++ b/src/simple.c @@ -14,6 +14,27 @@ struct pa_simple { int dead; }; +static int iterate(struct pa_simple *p, int block, int *perror) { + assert(p && p->context && p->mainloop && perror); + + if (!block && !pa_context_is_pending(p->context)) + return 0; + + do { + if (pa_context_is_dead(p->context) || (p->stream && pa_stream_is_dead(p->stream))) { + *perror = pa_context_errno(p->context); + return -1; + } + + if (pa_mainloop_iterate(p->mainloop, 1, NULL) < 0) { + *perror = PA_ERROR_INTERNAL; + return -1; + } + } while (pa_context_is_pending(p->context)); + + return 0; +} + struct pa_simple* pa_simple_new( const char *server, const char *name, @@ -44,26 +65,18 @@ struct pa_simple* pa_simple_new( goto fail; } + /* Wait until the context is ready */ while (!pa_context_is_ready(p->context)) { - if (pa_context_is_dead(p->context)) { - error = pa_context_errno(p->context); - goto fail; - } - - if (pa_mainloop_iterate(p->mainloop, 1, NULL) < 0) + if (iterate(p, 1, &error) < 0) goto fail; } if (!(p->stream = pa_stream_new(p->context, dir, dev, stream_name, ss, attr, NULL, NULL))) goto fail; + /* Wait until the stream is ready */ while (!pa_stream_is_ready(p->stream)) { - if (pa_stream_is_dead(p->stream)) { - error = pa_context_errno(p->context); - goto fail; - } - - if (pa_mainloop_iterate(p->mainloop, 1, NULL) < 0) + if (iterate(p, 1, &error) < 0) goto fail; } @@ -96,17 +109,9 @@ int pa_simple_write(struct pa_simple *p, const void*data, size_t length, int *pe while (length > 0) { size_t l; - while (!(l = pa_stream_writable_size(p->stream))) { - if (pa_context_is_dead(p->context)) { - *perror = pa_context_errno(p->context); + while (!(l = pa_stream_writable_size(p->stream))) + if (iterate(p, 1, perror) < 0) return -1; - } - - if (pa_mainloop_iterate(p->mainloop, 1, NULL) < 0) { - *perror = PA_ERROR_INTERNAL; - return -1; - } - } if (l > length) l = length; @@ -116,9 +121,14 @@ int pa_simple_write(struct pa_simple *p, const void*data, size_t length, int *pe length -= l; } + /* Make sure that no data is pending for write */ + if (iterate(p, 0, perror) < 0) + return -1; + return 0; } int pa_simple_read(struct pa_simple *s, void*data, size_t length, int *perror) { assert(0); } + diff --git a/src/sink.c b/src/sink.c index 79bf1778..4852edcc 100644 --- a/src/sink.c +++ b/src/sink.c @@ -274,8 +274,18 @@ char *pa_sink_list_to_string(struct pa_core *c) { default_sink = pa_sink_get_default(c); for (sink = pa_idxset_first(c->sinks, &index); sink; sink = pa_idxset_next(c->sinks, &index)) { + char ss[PA_SAMPLE_SNPRINT_MAX_LENGTH]; + pa_sample_snprint(ss, sizeof(ss), &sink->sample_spec); assert(sink->monitor_source); - pa_strbuf_printf(s, " %c index: %u, name: <%s>, volume: <0x%04x>, latency: <%u usec>, monitor_source: <%u>\n", sink == default_sink ? '*' : ' ', sink->index, sink->name, (unsigned) sink->volume, pa_sink_get_latency(sink), sink->monitor_source->index); + pa_strbuf_printf( + s, + " %c index: %u\n\tname: <%s>\n\tvolume: <0x%04x>\n\tlatency: <%u usec>\n\tmonitor_source: <%u>\n\tsample_spec: <%s>\n", + sink == default_sink ? '*' : ' ', + sink->index, sink->name, + (unsigned) sink->volume, + pa_sink_get_latency(sink), + sink->monitor_source->index, + ss); } return pa_strbuf_tostring_free(s); diff --git a/src/sinkinput.c b/src/sinkinput.c index aa6f3329..20ab25ea 100644 --- a/src/sinkinput.c +++ b/src/sinkinput.c @@ -85,13 +85,18 @@ char *pa_sink_input_list_to_string(struct pa_core *c) { pa_strbuf_printf(s, "%u sink input(s) available.\n", pa_idxset_ncontents(c->sink_inputs)); for (i = pa_idxset_first(c->sink_inputs, &index); i; i = pa_idxset_next(c->sink_inputs, &index)) { + char ss[PA_SAMPLE_SNPRINT_MAX_LENGTH]; + pa_sample_snprint(ss, sizeof(ss), &i->sample_spec); assert(i->sink); - pa_strbuf_printf(s, " index: %u, name: <%s>, sink: <%u>; volume: <0x%04x>, latency: <%u usec>\n", - i->index, - i->name, - i->sink->index, - (unsigned) i->volume, - pa_sink_input_get_latency(i)); + pa_strbuf_printf( + s, + " index: %u\n\tname: <%s>\n\tsink: <%u>\n\tvolume: <0x%04x>\n\tlatency: <%u usec>\n\tsample_spec: <%s>\n", + i->index, + i->name, + i->sink->index, + (unsigned) i->volume, + pa_sink_input_get_latency(i), + ss); } return pa_strbuf_tostring_free(s); diff --git a/src/socket-client.h b/src/socket-client.h index 76126aee..046cc3a5 100644 --- a/src/socket-client.h +++ b/src/socket-client.h @@ -5,6 +5,8 @@ #include "mainloop-api.h" #include "iochannel.h" +/* It is safe to destroy the calling socket_client object from the callback */ + struct pa_socket_client; struct pa_socket_client* pa_socket_client_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port); diff --git a/src/socket-server.h b/src/socket-server.h index d581fa37..dbce172e 100644 --- a/src/socket-server.h +++ b/src/socket-server.h @@ -5,6 +5,8 @@ #include "mainloop-api.h" #include "iochannel.h" +/* It is safe to destroy the calling socket_server object from the callback */ + struct pa_socket_server; struct pa_socket_server* pa_socket_server_new(struct pa_mainloop_api *m, int fd); diff --git a/src/source.c b/src/source.c index 1c97604b..44d7da01 100644 --- a/src/source.c +++ b/src/source.c @@ -111,10 +111,12 @@ char *pa_source_list_to_string(struct pa_core *c) { default_source = pa_source_get_default(c); for (source = pa_idxset_first(c->sources, &index); source; source = pa_idxset_next(c->sources, &index)) { + char ss[PA_SAMPLE_SNPRINT_MAX_LENGTH]; char mo[256] = ""; if (source->monitor_of) - snprintf(mo, sizeof(mo), ", monitor_of: <%u>", source->monitor_of->index); - pa_strbuf_printf(s, " %c index: %u, name: <%s>%s\n", source == default_source ? '*' : ' ', source->index, source->name, mo); + snprintf(mo, sizeof(mo), "\n\tmonitor_of: <%u>", source->monitor_of->index); + pa_sample_snprint(ss, sizeof(ss), &source->sample_spec); + pa_strbuf_printf(s, " %c index: %u\n\tname: <%s>\n\tsample_spec: <%s>%s\n", source == default_source ? '*' : ' ', source->index, source->name, ss, mo); } return pa_strbuf_tostring_free(s); diff --git a/src/sourceoutput.c b/src/sourceoutput.c index 4f9f821b..388f1225 100644 --- a/src/sourceoutput.c +++ b/src/sourceoutput.c @@ -68,11 +68,16 @@ char *pa_source_output_list_to_string(struct pa_core *c) { pa_strbuf_printf(s, "%u source outputs(s) available.\n", pa_idxset_ncontents(c->source_outputs)); for (o = pa_idxset_first(c->source_outputs, &index); o; o = pa_idxset_next(c->source_outputs, &index)) { + char ss[PA_SAMPLE_SNPRINT_MAX_LENGTH]; + pa_sample_snprint(ss, sizeof(ss), &o->sample_spec); assert(o->source); - pa_strbuf_printf(s, " %c index: %u, name: <%s>, source: <%u>\n", - o->index, - o->name, - o->source->index); + pa_strbuf_printf( + s, " %c index: %u\n\tname: <%s>\n\tsource: <%u>\n\tsample_spec: <%u>\n", + o->index, + o->name, + o->source->index, + ss, + ss); } return pa_strbuf_tostring_free(s); diff --git a/src/todo b/src/todo index 5b0d893f..57040cd8 100644 --- a/src/todo +++ b/src/todo @@ -1,7 +1,7 @@ - recording (general, simple, esound, native) - native library/protocol: sync() function - more functions + more functions (esp. latency) - simple library - config parser/cmdline @@ -9,18 +9,20 @@ - move more stuff from module-oss[-dma] to liboss-util - in module-oss: create source first, than sink +- client field for sinkinput/sourceoutput + +- xmms+esound latency testing + - rename files - svn-id and license in every file - documentation - - -- post 0.1 - future cancellation - client-ui - clip cache - autoloading/autounloading -- ldap/rendezvous +- slp/rendezvous - doxygen drivers: diff --git a/src/util.c b/src/util.c index 4ade681a..3111bd5d 100644 --- a/src/util.c +++ b/src/util.c @@ -1,3 +1,4 @@ +#include #include #include #include @@ -51,7 +52,7 @@ void pa_peer_to_string(char *c, size_t l, int fd) { ntohs(sa.in.sin_port)); return; } else if (sa.sa.sa_family == AF_LOCAL) { - snprintf(c, l, "UNIX client for %s", sa.un.sun_path); + snprintf(c, l, "UNIX socket client"); return; } @@ -208,3 +209,15 @@ int pa_unix_socket_remove_stale(const char *fn) { return 0; } + +void pa_check_for_sigpipe(void) { + struct sigaction sa; + + if (sigaction(SIGPIPE, NULL, &sa) < 0) { + fprintf(stderr, __FILE__": sigaction() failed: %s\n", strerror(errno)); + return; + } + + if (sa.sa_handler == SIG_DFL) + fprintf(stderr, "polypaudio: WARNING: SIGPIPE is not trapped. This might cause malfunction!\n"); +} diff --git a/src/util.h b/src/util.h index 40095e01..ad9916e7 100644 --- a/src/util.h +++ b/src/util.h @@ -18,4 +18,6 @@ ssize_t pa_loop_write(int fd, const void*data, size_t size); int pa_unix_socket_is_stale(const char *fn); int pa_unix_socket_remove_stale(const char *fn); +void pa_check_for_sigpipe(void); + #endif -- cgit